fix typing

This commit is contained in:
jxxghp
2023-06-13 23:41:50 +08:00
parent c1a22518a2
commit c06122ff19
15 changed files with 186 additions and 132 deletions

View File

@ -6,6 +6,7 @@ from ruamel.yaml import CommentedMap
from app.core.context import MediaInfo, TorrentInfo, Context
from app.core.meta import MetaBase
from app.schemas.context import TransferInfo, TransferTorrent, ExistMediaInfo
from app.utils.types import TorrentStatus, MediaType
@ -98,7 +99,7 @@ class _ModuleBase(metaclass=ABCMeta):
"""
pass
def media_exists(self, mediainfo: MediaInfo) -> Optional[dict]:
def media_exists(self, mediainfo: MediaInfo) -> Optional[ExistMediaInfo]:
"""
判断媒体文件是否存在
:param mediainfo: 识别的媒体信息
@ -156,7 +157,7 @@ class _ModuleBase(metaclass=ABCMeta):
pass
def list_torrents(self, status: TorrentStatus = None,
hashs: Union[list, str] = None) -> Optional[List[dict]]:
hashs: Union[list, str] = None) -> Optional[List[TransferTorrent]]:
"""
获取下载器种子列表
:param status: 种子状态
@ -165,7 +166,7 @@ class _ModuleBase(metaclass=ABCMeta):
"""
pass
def transfer(self, path: str, mediainfo: MediaInfo) -> Optional[dict]:
def transfer(self, path: Path, mediainfo: MediaInfo) -> Optional[TransferInfo]:
"""
转移一个路径下的文件
:param path: 文件路径
@ -174,7 +175,7 @@ class _ModuleBase(metaclass=ABCMeta):
"""
pass
def transfer_completed(self, hashs: Union[str, list], transinfo: dict) -> None:
def transfer_completed(self, hashs: Union[str, list], transinfo: TransferInfo) -> None:
"""
转移完成后的处理
:param hashs: 种子Hash
@ -191,7 +192,7 @@ class _ModuleBase(metaclass=ABCMeta):
"""
pass
def refresh_mediaserver(self, mediainfo: MediaInfo, file_path: str) -> Optional[bool]:
def refresh_mediaserver(self, mediainfo: MediaInfo, file_path: Path) -> Optional[bool]:
"""
刷新媒体库
:param mediainfo: 识别的媒体信息

View File

@ -1,10 +1,11 @@
import json
from pathlib import Path
from typing import Optional, Tuple, Union, Any
from app.core.context import MediaInfo
from app.log import logger
from app.modules import _ModuleBase
from app.modules.emby.emby import Emby
from app.schemas.context import ExistMediaInfo
from app.utils.types import MediaType
@ -31,7 +32,7 @@ class EmbyModule(_ModuleBase):
"""
return self.emby.get_webhook_message(form.get("data"))
def media_exists(self, mediainfo: MediaInfo) -> Optional[dict]:
def media_exists(self, mediainfo: MediaInfo) -> Optional[ExistMediaInfo]:
"""
判断媒体文件是否存在
:param mediainfo: 识别的媒体信息
@ -44,7 +45,7 @@ class EmbyModule(_ModuleBase):
return None
else:
logger.info(f"媒体库中已存在:{movies}")
return {"type": MediaType.MOVIE}
return ExistMediaInfo(type=MediaType.MOVIE)
else:
tvs = self.emby.get_tv_episodes(title=mediainfo.title,
year=mediainfo.year,
@ -54,9 +55,9 @@ class EmbyModule(_ModuleBase):
return None
else:
logger.info(f"{mediainfo.get_title_string()} 媒体库中已存在:{tvs}")
return {"type": MediaType.TV, "seasons": tvs}
return ExistMediaInfo(type=MediaType.TV, seasons=tvs)
def refresh_mediaserver(self, mediainfo: MediaInfo, file_path: str) -> Optional[bool]:
def refresh_mediaserver(self, mediainfo: MediaInfo, file_path: Path) -> Optional[bool]:
"""
刷新媒体库
:param mediainfo: 识别的媒体信息

View File

@ -11,6 +11,7 @@ from app.core.config import settings
from app.core.meta import MetaBase
from app.log import logger
from app.modules import _ModuleBase
from app.schemas.context import TransferInfo
from app.utils.system import SystemUtils
from app.utils.types import MediaType
@ -28,7 +29,7 @@ class FileTransferModule(_ModuleBase):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
pass
def transfer(self, path: str, mediainfo: MediaInfo) -> Optional[dict]:
def transfer(self, path: Path, mediainfo: MediaInfo) -> Optional[TransferInfo]:
"""
文件转移
:param path: 文件路径
@ -38,18 +39,14 @@ class FileTransferModule(_ModuleBase):
if not settings.LIBRARY_PATH:
logger.error("未设置媒体库目录,无法转移文件")
return None
target_path, msg = self.transfer_media(in_path=Path(path),
target_path, msg = self.transfer_media(in_path=path,
meidainfo=mediainfo,
rmt_mode=settings.TRANSFER_TYPE,
target_dir=Path(settings.LIBRARY_PATH))
if not path:
logger.error(msg)
return {
"path": path,
"target_path": target_path,
"message": msg
}
return TransferInfo(path=path, target_path=target_path, message=msg)
@staticmethod
def __transfer_command(file_item: Path, target_file: Path, rmt_mode) -> int:

View File

@ -1,10 +1,12 @@
import json
from pathlib import Path
from typing import Optional, Tuple, Union, Any
from app.core.context import MediaInfo
from app.log import logger
from app.modules import _ModuleBase
from app.modules.jellyfin.jellyfin import Jellyfin
from app.schemas.context import ExistMediaInfo
from app.utils.types import MediaType
@ -30,7 +32,7 @@ class JellyfinModule(_ModuleBase):
"""
return self.jellyfin.get_webhook_message(json.loads(body))
def media_exists(self, mediainfo: MediaInfo) -> Optional[dict]:
def media_exists(self, mediainfo: MediaInfo) -> Optional[ExistMediaInfo]:
"""
判断媒体文件是否存在
:param mediainfo: 识别的媒体信息
@ -43,7 +45,7 @@ class JellyfinModule(_ModuleBase):
return None
else:
logger.info(f"媒体库中已存在:{movies}")
return {"type": MediaType.MOVIE}
return ExistMediaInfo(type=MediaType.MOVIE)
else:
tvs = self.jellyfin.get_tv_episodes(title=mediainfo.title,
year=mediainfo.year,
@ -53,9 +55,9 @@ class JellyfinModule(_ModuleBase):
return None
else:
logger.info(f"{mediainfo.get_title_string()} 媒体库中已存在:{tvs}")
return {"type": MediaType.TV, "seasons": tvs}
return ExistMediaInfo(type=MediaType.TV, seasons=tvs)
def refresh_mediaserver(self, mediainfo: MediaInfo, file_path: str) -> Optional[bool]:
def refresh_mediaserver(self, mediainfo: MediaInfo, file_path: Path) -> Optional[bool]:
"""
刷新媒体库
:param mediainfo: 识别的媒体信息

View File

@ -1,9 +1,11 @@
from pathlib import Path
from typing import Optional, Tuple, Union, Any
from app.core.context import MediaInfo
from app.log import logger
from app.modules import _ModuleBase
from app.modules.plex.plex import Plex
from app.schemas.context import ExistMediaInfo
from app.utils.types import MediaType
@ -30,7 +32,7 @@ class PlexModule(_ModuleBase):
"""
return self.plex.get_webhook_message(form.get("payload"))
def media_exists(self, mediainfo: MediaInfo) -> Optional[dict]:
def media_exists(self, mediainfo: MediaInfo) -> Optional[ExistMediaInfo]:
"""
判断媒体文件是否存在
:param mediainfo: 识别的媒体信息
@ -43,7 +45,7 @@ class PlexModule(_ModuleBase):
return None
else:
logger.info(f"媒体库中已存在:{movies}")
return {"type": MediaType.MOVIE}
return ExistMediaInfo(type=MediaType.MOVIE)
else:
tvs = self.plex.get_tv_episodes(title=mediainfo.title,
year=mediainfo.year)
@ -52,9 +54,9 @@ class PlexModule(_ModuleBase):
return None
else:
logger.info(f"{mediainfo.get_title_string()} 媒体库中已存在:{tvs}")
return {"type": MediaType.TV, "seasons": tvs}
return ExistMediaInfo(type=MediaType.TV, seasons=tvs)
def refresh_mediaserver(self, mediainfo: MediaInfo, file_path: str) -> Optional[bool]:
def refresh_mediaserver(self, mediainfo: MediaInfo, file_path: Path) -> Optional[bool]:
"""
刷新媒体库
:param mediainfo: 识别的媒体信息

View File

@ -6,6 +6,7 @@ from app.core.metainfo import MetaInfo
from app.log import logger
from app.modules import _ModuleBase
from app.modules.qbittorrent.qbittorrent import Qbittorrent
from app.schemas.context import TransferInfo, TransferTorrent
from app.utils.string import StringUtils
from app.utils.types import TorrentStatus
@ -84,7 +85,8 @@ class QbittorrentModule(_ModuleBase):
else:
return torrent_hash, "添加下载成功"
def list_torrents(self, status: TorrentStatus = None, hashs: Union[list, str] = None) -> Optional[List[dict]]:
def list_torrents(self, status: TorrentStatus = None,
hashs: Union[list, str] = None) -> Optional[List[TransferTorrent]]:
"""
获取下载器种子列表
:param status: 种子状态
@ -101,12 +103,12 @@ class QbittorrentModule(_ModuleBase):
torrent_path = Path(content_path)
else:
torrent_path = Path(settings.DOWNLOAD_PATH) / torrent.get('name')
ret_torrents.append({
'title': torrent.get('name'),
'path': torrent_path,
'hash': torrent.get('hash'),
'tags': torrent.get('tags')
})
ret_torrents.append(TransferTorrent(
title=torrent.get('name'),
path=torrent_path,
hash=torrent.get('hash'),
tags=torrent.get('tags')
))
elif status == TorrentStatus.TRANSFER:
# 获取已完成且未整理的
torrents = self.qbittorrent.get_completed_torrents(tags=settings.TORRENT_TAG)
@ -120,17 +122,17 @@ class QbittorrentModule(_ModuleBase):
torrent_path = Path(content_path)
else:
torrent_path = Path(settings.DOWNLOAD_PATH) / torrent.get('name')
ret_torrents.append({
'title': torrent.get('name'),
'path': torrent_path,
'hash': torrent.get('hash'),
'tags': torrent.get('tags')
})
ret_torrents.append(TransferTorrent(
title=torrent.get('name'),
path=torrent_path,
hash=torrent.get('hash'),
tags=torrent.get('tags')
))
else:
return None
return ret_torrents
def transfer_completed(self, hashs: Union[str, list], transinfo: dict) -> None:
def transfer_completed(self, hashs: Union[str, list], transinfo: TransferInfo) -> None:
"""
转移完成后的处理
:param hashs: 种子Hash

View File

@ -6,6 +6,7 @@ from app.core.metainfo import MetaInfo
from app.log import logger
from app.modules import _ModuleBase
from app.modules.transmission.transmission import Transmission
from app.schemas.context import TransferInfo, TransferTorrent
from app.utils.types import TorrentStatus
@ -71,7 +72,8 @@ class TransmissionModule(_ModuleBase):
else:
return torrent_hash, "添加下载任务成功"
def list_torrents(self, status: TorrentStatus = None, hashs: Union[list, str] = None) -> Optional[List[dict]]:
def list_torrents(self, status: TorrentStatus = None,
hashs: Union[list, str] = None) -> Optional[List[TransferTorrent]]:
"""
获取下载器种子列表
:param status: 种子状态
@ -83,12 +85,12 @@ class TransmissionModule(_ModuleBase):
# 按Hash获取
torrents, _ = self.transmission.get_torrents(ids=hashs, tags=settings.TORRENT_TAG)
for torrent in torrents:
ret_torrents.append({
'title': torrent.name,
'path': Path(torrent.download_dir) / torrent.name,
'hash': torrent.hashString,
'tags': torrent.labels
})
ret_torrents.append(TransferTorrent(
title=torrent.name,
path=Path(torrent.download_dir) / torrent.name,
hash=torrent.hashString,
tags=torrent.labels
))
elif status == TorrentStatus.TRANSFER:
# 获取已完成且未整理的
torrents = self.transmission.get_completed_torrents(tags=settings.TORRENT_TAG)
@ -102,17 +104,17 @@ class TransmissionModule(_ModuleBase):
if not path:
logger.debug(f"未获取到 {torrent.name} 下载保存路径")
continue
ret_torrents.append({
'title': torrent.name,
'path': Path(path) / torrent.name,
'hash': torrent.hashString,
'tags': torrent.labels
})
ret_torrents.append(TransferTorrent(
title=torrent.name,
path=Path(torrent.download_dir) / torrent.name,
hash=torrent.hashString,
tags=torrent.labels
))
else:
return None
return ret_torrents
def transfer_completed(self, hashs: Union[str, list], transinfo: dict) -> None:
def transfer_completed(self, hashs: Union[str, list], transinfo: TransferInfo) -> None:
"""
转移完成后的处理
:param hashs: 种子Hash

View File

@ -1,7 +1,5 @@
from pathlib import Path
from typing import Tuple, Union
from app.core.context import Context
from app.modules import _ModuleBase