fix plugin api

This commit is contained in:
thsrite 2023-10-28 11:22:37 +08:00
parent db3040a50e
commit bbc4a1bfa5
3 changed files with 46 additions and 13 deletions

View File

@ -9,6 +9,7 @@ import pytz
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.cron import CronTrigger
from app import schemas
from app.core.config import settings from app.core.config import settings
from app.plugins import _PluginBase from app.plugins import _PluginBase
from typing import Any, List, Dict, Tuple, Optional from typing import Any, List, Dict, Tuple, Optional
@ -67,7 +68,7 @@ class AutoBackup(_PluginBase):
if self._cron: if self._cron:
try: try:
self._scheduler.add_job(func=self.backup, self._scheduler.add_job(func=self.__backup,
trigger=CronTrigger.from_crontab(self._cron), trigger=CronTrigger.from_crontab(self._cron),
name="自动备份") name="自动备份")
except Exception as err: except Exception as err:
@ -75,7 +76,7 @@ class AutoBackup(_PluginBase):
if self._onlyonce: if self._onlyonce:
logger.info(f"自动备份服务启动,立即运行一次") logger.info(f"自动备份服务启动,立即运行一次")
self._scheduler.add_job(func=self.backup, trigger='date', self._scheduler.add_job(func=self.__backup, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3), run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="自动备份") name="自动备份")
# 关闭一次性开关 # 关闭一次性开关
@ -93,7 +94,7 @@ class AutoBackup(_PluginBase):
self._scheduler.print_jobs() self._scheduler.print_jobs()
self._scheduler.start() self._scheduler.start()
def backup(self): def __backup(self):
""" """
自动备份删除备份 自动备份删除备份
""" """
@ -103,12 +104,16 @@ class AutoBackup(_PluginBase):
bk_path = self.get_data_path() bk_path = self.get_data_path()
# 备份 # 备份
zip_file = self.backup(bk_path=bk_path) zip_file = self.backup_file(bk_path=bk_path)
if zip_file: if zip_file:
logger.info(f"备份完成 备份文件 {zip_file} ") success = True
msg = f"备份完成 备份文件 {zip_file}"
logger.info(msg)
else: else:
logger.error("创建备份失败") success = False
msg = "创建备份失败"
logger.error(msg)
# 清理备份 # 清理备份
bk_cnt = 0 bk_cnt = 0
@ -140,8 +145,10 @@ class AutoBackup(_PluginBase):
f"清理备份数量 {del_cnt}\n" f"清理备份数量 {del_cnt}\n"
f"剩余备份数量 {bk_cnt - del_cnt}") f"剩余备份数量 {bk_cnt - del_cnt}")
return success, msg
@staticmethod @staticmethod
def backup(bk_path: Path = None): def backup_file(bk_path: Path = None):
""" """
@param bk_path 自定义备份路径 @param bk_path 自定义备份路径
""" """
@ -175,12 +182,22 @@ class AutoBackup(_PluginBase):
def get_api(self) -> List[Dict[str, Any]]: def get_api(self) -> List[Dict[str, Any]]:
return [{ return [{
"path": "/backup", "path": "/backup",
"endpoint": self.backup, "endpoint": self.__backup,
"methods": ["GET"], "methods": ["GET"],
"summary": "MoviePilot备份", "summary": "MoviePilot备份",
"description": "MoviePilot备份", "description": "MoviePilot备份",
}] }]
def backup(self) -> schemas.Response:
"""
API调用备份
"""
success, msg = self.__backup()
return schemas.Response(
success=success,
message=msg
)
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]: def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
""" """
拼装插件配置页面需要返回两块数据1页面配置2数据结构 拼装插件配置页面需要返回两块数据1页面配置2数据结构

View File

@ -13,6 +13,7 @@ from apscheduler.triggers.cron import CronTrigger
from python_hosts import Hosts, HostsEntry from python_hosts import Hosts, HostsEntry
from requests import Response from requests import Response
from app import schemas
from app.core.config import settings from app.core.config import settings
from app.core.event import eventmanager, Event from app.core.event import eventmanager, Event
from app.log import logger from app.log import logger
@ -88,13 +89,13 @@ class CloudflareSpeedTest(_PluginBase):
try: try:
if self.get_state() and self._cron: if self.get_state() and self._cron:
logger.info(f"Cloudflare CDN优选服务启动周期{self._cron}") logger.info(f"Cloudflare CDN优选服务启动周期{self._cron}")
self._scheduler.add_job(func=self.cloudflareSpeedTest, self._scheduler.add_job(func=self.__cloudflareSpeedTest,
trigger=CronTrigger.from_crontab(self._cron), trigger=CronTrigger.from_crontab(self._cron),
name="Cloudflare优选") name="Cloudflare优选")
if self._onlyonce: if self._onlyonce:
logger.info(f"Cloudflare CDN优选服务启动立即运行一次") logger.info(f"Cloudflare CDN优选服务启动立即运行一次")
self._scheduler.add_job(func=self.cloudflareSpeedTest, trigger='date', self._scheduler.add_job(func=self.__cloudflareSpeedTest, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3), run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="Cloudflare优选") name="Cloudflare优选")
# 关闭一次性开关 # 关闭一次性开关
@ -111,7 +112,7 @@ class CloudflareSpeedTest(_PluginBase):
self._scheduler.start() self._scheduler.start()
@eventmanager.register(EventType.CloudFlareSpeedTest) @eventmanager.register(EventType.CloudFlareSpeedTest)
def cloudflareSpeedTest(self, event: Event = None): def __cloudflareSpeedTest(self, event: Event = None):
""" """
CloudflareSpeedTest优选 CloudflareSpeedTest优选
""" """
@ -491,7 +492,7 @@ class CloudflareSpeedTest(_PluginBase):
def get_api(self) -> List[Dict[str, Any]]: def get_api(self) -> List[Dict[str, Any]]:
return [{ return [{
"path": "/cloudflare_speedtest", "path": "/cloudflare_speedtest",
"endpoint": self.cloudflareSpeedTest, "endpoint": self.cloudflare_speedtest,
"methods": ["GET"], "methods": ["GET"],
"summary": "Cloudflare IP优选", "summary": "Cloudflare IP优选",
"description": "Cloudflare IP优选", "description": "Cloudflare IP优选",
@ -726,6 +727,13 @@ class CloudflareSpeedTest(_PluginBase):
def get_page(self) -> List[dict]: def get_page(self) -> List[dict]:
pass pass
def cloudflare_speedtest(self) -> schemas.Response:
"""
API调用CloudflareSpeedTest IP优选
"""
self.__cloudflareSpeedTest()
return schemas.Response(success=True)
@staticmethod @staticmethod
def __read_system_hosts(): def __read_system_hosts():
""" """

View File

@ -12,6 +12,7 @@ from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver from watchdog.observers.polling import PollingObserver
from app import schemas
from app.chain.tmdb import TmdbChain from app.chain.tmdb import TmdbChain
from app.chain.transfer import TransferChain from app.chain.transfer import TransferChain
from app.core.config import settings from app.core.config import settings
@ -602,12 +603,19 @@ class DirMonitor(_PluginBase):
def get_api(self) -> List[Dict[str, Any]]: def get_api(self) -> List[Dict[str, Any]]:
return [{ return [{
"path": "/directory_sync", "path": "/directory_sync",
"endpoint": self.sync_all, "endpoint": self.sync,
"methods": ["GET"], "methods": ["GET"],
"summary": "目录监控同步", "summary": "目录监控同步",
"description": "目录监控同步", "description": "目录监控同步",
}] }]
def sync(self) -> schemas.Response:
"""
API调用目录同步
"""
self.sync_all()
return schemas.Response(success=True)
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]: def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
return [ return [
{ {