fix typing

This commit is contained in:
jxxghp 2023-06-06 10:31:30 +08:00
parent 774af91e3a
commit cc91177b92
10 changed files with 74 additions and 31 deletions

View File

@ -113,10 +113,15 @@ class CommonChain(_ChainBase):
if not _torrent_file:
return
# 添加下载
_hash, error_msg = self.run_module("download",
torrent_path=_torrent_file,
mediainfo=_media,
episodes=_episodes)
result: Optional[tuple] = self.run_module("download",
torrent_path=_torrent_file,
mediainfo=_media,
episodes=_episodes)
if result:
_hash, error_msg = result
else:
_hash, error_msg = None, "未知错误"
if _hash:
# 下载成功
downloaded_list.append(_context)
@ -327,6 +332,7 @@ class CommonChain(_ChainBase):
:param no_exists: 在调用该方法前已经存储的不存在的季集信息有传入时该函数搜索的内容将会叠加后输出
:return: 当前媒体是否缺失各标题总的季集和缺失的季集
"""
def __append_no_exists(_season: int, _episodes: list):
"""
添加不存在的季集信息
@ -353,14 +359,14 @@ class CommonChain(_ChainBase):
return False, {}
if mediainfo.type == MediaType.MOVIE:
# 电影
exists_movies = self.run_module("media_exists", mediainfo)
exists_movies: Optional[dict] = self.run_module("media_exists", mediainfo)
if exists_movies:
logger.info(f"媒体库中已存在电影:{mediainfo.get_title_string()}")
return True, {}
return False, {}
else:
# 电视剧
exists_tvs = self.run_module("media_exists", mediainfo)
exists_tvs: Optional[dict] = self.run_module("media_exists", mediainfo)
if not exists_tvs:
# 所有剧集均缺失
for season, episodes in mediainfo.seasons.items():

View File

@ -1,9 +1,10 @@
from pathlib import Path
from typing import Optional
from app.chain import _ChainBase
from app.chain.common import CommonChain
from app.chain.search import SearchChain
from app.core import settings, MetaInfo
from app.core import settings, MetaInfo, MediaInfo
from app.db.subscribes import Subscribes
from app.helper.rss import RssHelper
from app.log import logger
@ -53,7 +54,7 @@ class DoubanSyncChain(_ChainBase):
if not douban_id or douban_id in caches:
continue
# 根据豆瓣ID获取豆瓣数据
doubaninfo = self.run_module('douban_info', doubanid=douban_id)
doubaninfo: Optional[dict] = self.run_module('douban_info', doubanid=douban_id)
if not doubaninfo:
logger.warn(f'未获取到豆瓣信息,标题:{title}豆瓣ID{douban_id}')
continue
@ -61,7 +62,7 @@ class DoubanSyncChain(_ChainBase):
meta = MetaInfo(doubaninfo.get("original_title") or doubaninfo.get("title"))
if doubaninfo.get("year"):
meta.year = doubaninfo.get("year")
mediainfo = self.run_module('recognize_media', meta=meta)
mediainfo: MediaInfo = self.run_module('recognize_media', meta=meta)
if not mediainfo:
logger.warn(f'未识别到媒体信息,标题:{title}豆瓣ID{douban_id}')
continue

View File

@ -16,7 +16,7 @@ class IdentifyChain(_ChainBase):
"""
logger.info(f'开始识别媒体信息,标题:{title},副标题:{subtitle} ...')
# 识别前预处理
result = self.run_module('prepare_recognize', title=title, subtitle=subtitle)
result: Optional[tuple] = self.run_module('prepare_recognize', title=title, subtitle=subtitle)
if result:
title, subtitle = result
# 识别元数据

View File

@ -61,7 +61,7 @@ class SearchChain(_ChainBase):
else:
_match_torrents = torrents
# 过滤种子
result = self.run_module("filter_torrents", torrent_list=_match_torrents)
result: List[TorrentInfo] = self.run_module("filter_torrents", torrent_list=_match_torrents)
if result is not None:
_match_torrents = result
if not _match_torrents:

View File

@ -1,4 +1,4 @@
from typing import Dict, List
from typing import Dict, List, Optional
from app.chain import _ChainBase
from app.chain.common import CommonChain
@ -37,7 +37,7 @@ class SubscribeChain(_ChainBase):
"""
logger.info(f'开始添加订阅,标题:{title} ...')
# 识别前预处理
result = self.run_module('prepare_recognize', title=title)
result: Optional[tuple] = self.run_module('prepare_recognize', title=title)
if result:
title, _ = result
# 识别元数据
@ -48,7 +48,7 @@ class SubscribeChain(_ChainBase):
metainfo.type = MediaType.TV
metainfo.begin_season = season
# 识别媒体信息
mediainfo = self.run_module('recognize_media', meta=metainfo, tmdbid=tmdbid)
mediainfo: MediaInfo = self.run_module('recognize_media', meta=metainfo, tmdbid=tmdbid)
if not mediainfo:
logger.warn(f'未识别到媒体信息,标题:{title}tmdbid{tmdbid}')
return False
@ -88,7 +88,7 @@ class SubscribeChain(_ChainBase):
meta.begin_season = subscribe.season
meta.type = MediaType.MOVIE if subscribe.type == MediaType.MOVIE.value else MediaType.TV
# 识别媒体信息
mediainfo = self.run_module('recognize_media', meta=meta, tmdbid=subscribe.tmdbid)
mediainfo: MediaInfo = self.run_module('recognize_media', meta=meta, tmdbid=subscribe.tmdbid)
if not mediainfo:
logger.warn(f'未识别到媒体信息,标题:{subscribe.name}tmdbid{subscribe.tmdbid}')
continue
@ -129,7 +129,7 @@ class SubscribeChain(_ChainBase):
# 识别
meta = MetaInfo(torrent.title, torrent.description)
# 识别媒体信息
mediainfo = self.run_module('recognize_media', meta=meta)
mediainfo: MediaInfo = self.run_module('recognize_media', meta=meta)
if not mediainfo:
logger.warn(f'未识别到媒体信息,标题:{torrent.title}')
continue

View File

@ -1,4 +1,9 @@
from typing import List, Optional
from app.chain import _ChainBase
from app.core import MetaInfo, MediaInfo
from app.log import logger
from app.utils.types import TorrentStatus
class TransferChain(_ChainBase):
@ -8,9 +13,28 @@ class TransferChain(_ChainBase):
def process(self) -> bool:
"""
根据媒体信息执行搜索
获取下载器中的种子列表并执行转移
"""
logger.info("开始执行下载器文件转移 ...")
# 从下载器获取种子列表
torrents: Optional[List[dict]] = self.run_module("list_torrents", status=TorrentStatus.COMPLETE)
if not torrents:
logger.info("没有获取到已完成的下载任务")
return False
logger.info(f"获取到 {len(torrents)} 个已完成的下载任务")
# 识别
for torrent in torrents:
# 识别元数据
meta = MetaInfo(torrent.get("title"))
# 识别媒体信息
mediainfo: MediaInfo = self.run_module('recognize_media', meta=meta)
if not mediainfo:
logger.warn(f'未识别到媒体信息,标题:{torrent.get("title")}')
return False
logger.info(f"{torrent.get('title')} 识别为:{mediainfo.type.value} {mediainfo.get_title_string()}")
# 更新媒体图片
self.run_module("obtain_image", mediainfo=mediainfo)
# 转移
self.run_module("transfer", mediainfo=mediainfo, torrent=torrent)
# 转移
pass

View File

@ -31,6 +31,7 @@ class UserMessageChain(_ChainBase):
self.common = CommonChain()
self.subscribes = Subscribes()
self.searchchain = SearchChain()
self.torrent = TorrentHelper()
def process(self, request: Request, *args, **kwargs) -> None:
"""
@ -129,21 +130,28 @@ class UserMessageChain(_ChainBase):
else:
# 下载种子
torrent: TorrentInfo = cache_list[int(text) - 1]
# 识别种子信息
meta = MetaInfo(torrent.title)
# 预处理种子
torrent_file, msg = self.run_module("prepare_torrent", torrentinfo=torrent)
meta: MetaBase = MetaInfo(torrent.title)
torrent_file, _, _, _, error_msg = self.torrent.download_torrent(
url=torrent.enclosure,
cookie=torrent.site_cookie,
ua=torrent.site_ua,
proxy=torrent.site_proxy)
if not torrent_file:
# 下载失败
logger.error(f"下载种子文件失败:{torrent.title} - {torrent.enclosure}")
self.run_module('post_message',
title=f"{torrent.title} 种子下载失败!",
text=f"错误信息:{msg}\n种子链接:{torrent.enclosure}",
text=f"错误信息:{error_msg}\n种子链接:{torrent.enclosure}",
userid=userid)
return
# 添加下载
state, msg = self.run_module("download_torrent",
torrent_path=torrent_file,
mediainfo=self._current_media)
result: Optional[tuple] = self.run_module("download",
torrent_path=torrent_file,
mediainfo=self._current_media)
if result:
state, msg = result
else:
state, msg = False, "未知错误"
# 发送消息
if not state:
# 下载失败
self.common.post_message(title=f"{torrent.title} 添加下载失败!",
@ -234,7 +242,7 @@ class UserMessageChain(_ChainBase):
meta.year = year
self._current_meta = meta
# 开始搜索
medias = self.run_module('search_medias', meta=meta)
medias: Optional[List[MediaInfo]] = self.run_module('search_medias', meta=meta)
if not medias:
self.common.post_message(title=f"{meta.get_name()} 没有找到对应的媒体信息!", userid=userid)
return

View File

@ -1,5 +1,3 @@
from typing import Any
from app.chain import _ChainBase
@ -13,7 +11,7 @@ class WebhookMessageChain(_ChainBase):
处理Webhook报文并发送消息
"""
# 获取主体内容
info = self.run_module('webhook_parser', message=message)
info: dict = self.run_module('webhook_parser', message=message)
if not info:
return
# 发送消息

View File

@ -6,6 +6,7 @@ from fastapi import Request
from app.core.context import MediaInfo, TorrentInfo
from app.core.meta import MetaBase
from app.utils.types import TorrentStatus
class _ModuleBase(metaclass=ABCMeta):
@ -130,7 +131,7 @@ class _ModuleBase(metaclass=ABCMeta):
"""
pass
def list_torrents(self, status: Union[str, list]) -> List[dict]:
def list_torrents(self, status: TorrentStatus) -> List[dict]:
"""
获取下载器种子列表
:param status: 种子状态

View File

@ -7,6 +7,11 @@ class MediaType(Enum):
UNKNOWN = '未知'
class TorrentStatus(Enum):
COMPLETE = "已完成"
DOWNLOADING = "下载中"
# 可监听事件
class EventType(Enum):
# 插件重载