fix typing

This commit is contained in:
jxxghp 2023-06-14 09:15:13 +08:00
parent 9d2d3f58a2
commit 4284358093
3 changed files with 69 additions and 70 deletions

View File

@ -58,20 +58,14 @@ class DownloadChain(ChainBase):
def batch_download(self,
contexts: List[Context],
need_tvs: Dict[int, List[NotExistMediaInfo]] = None,
no_exists: Dict[int, Dict[int, NotExistMediaInfo]] = None,
userid: str = None) -> Tuple[List[Context], dict]:
"""
根据缺失数据自动种子列表中组合择优下载
:param contexts: 资源上下文列表
:param need_tvs: 缺失的剧集信息
:param no_exists: 缺失的剧集信息
:param userid: 用户ID
:return: 已经下载的资源列表剩余未下载到的剧集 no_exists[mediainfo.tmdb_id] = [
{
"season": season,
"episodes": episodes,
"total_episodes": len(episodes)
}
]
:return: 已经下载的资源列表剩余未下载到的剧集 no_exists[tmdb_id] = {season: NotExistMediaInfo}
"""
# 已下载的项目
downloaded_list: List[Context] = []
@ -137,45 +131,54 @@ class DownloadChain(ChainBase):
userid=userid)
return _hash
def __update_seasons(tmdbid: int, need: list, current: list) -> list:
def __update_seasons(_tmdbid: int, _need: list, _current: list) -> list:
"""
更新need_tvs季数返回剩余季数
:param _tmdbid: TMDBID
:param _need: 需要下载的季数
:param _current: 已经下载的季数
"""
need = list(set(need).difference(set(current)))
for cur in current:
for nt in need_tvs.get(tmdbid):
if cur == nt.get("season") or (cur == 1 and not nt.get("season")):
need_tvs[tmdbid].remove(nt)
if not need_tvs.get(tmdbid) and need_tvs.get(tmdbid) is not None:
need_tvs.pop(tmdbid)
# 剩余季数
need = list(set(_need).difference(set(_current)))
# 清除已下载的季信息
for _sea in list(no_exists.get(_tmdbid)):
if _sea not in need:
no_exists[_tmdbid].pop(_sea)
if not no_exists.get(_tmdbid) and no_exists.get(_tmdbid) is not None:
no_exists.pop(_tmdbid)
return need
def __update_episodes(tmdbid: int, seq: int, need: list, current: set) -> list:
def __update_episodes(_tmdbid: int, _sea: int, _need: list, _current: set) -> list:
"""
更新need_tvs集数返回剩余集数
:param _tmdbid: TMDBID
:param _sea: 季数
:param _need: 需要下载的集数
:param _current: 已经下载的集数
"""
need = list(set(need).difference(set(current)))
# 剩余集数
need = list(set(_need).difference(set(_current)))
if need:
not_exist = need_tvs[tmdbid][seq]
need_tvs[tmdbid][seq] = NotExistMediaInfo(
not_exist = no_exists[_tmdbid][_sea]
no_exists[_tmdbid][_sea] = NotExistMediaInfo(
season=not_exist.season,
episodes=need,
total_episodes=not_exist.total_episodes,
start_episode=not_exist.start_episode
)
else:
need_tvs[tmdbid].pop(seq)
if not need_tvs.get(tmdbid) and need_tvs.get(tmdbid) is not None:
need_tvs.pop(tmdbid)
no_exists[_tmdbid].pop(_sea)
if not no_exists.get(_tmdbid) and no_exists.get(_tmdbid) is not None:
no_exists.pop(_tmdbid)
return need
def __get_season_episodes(tmdbid: int, season: int) -> int:
"""
获取需要的季的集数
"""
if not need_tvs.get(tmdbid):
if not no_exists.get(tmdbid):
return 0
for nt in need_tvs.get(tmdbid):
for nt in no_exists.get(tmdbid):
if season == nt.get("season"):
return nt.get("total_episodes")
return 0
@ -189,11 +192,11 @@ class DownloadChain(ChainBase):
__download(context)
# 电视剧整季匹配
if need_tvs:
if no_exists:
# 先把整季缺失的拿出来,看是否刚好有所有季都满足的种子
need_seasons: Dict[int, list] = {}
for need_tmdbid, need_tv in need_tvs.items():
for tv in need_tv:
for need_tmdbid, need_tv in no_exists.items():
for tv in need_tv.values():
if not tv:
continue
if not tv.episodes:
@ -231,18 +234,17 @@ class DownloadChain(ChainBase):
if download_id:
# 更新仍需季集
need_season = __update_seasons(tmdbid=need_tmdbid,
need=need_season,
current=item_season)
need_season = __update_seasons(_tmdbid=need_tmdbid,
_need=need_season,
_current=item_season)
# 电视剧季内的集匹配
if need_tvs:
need_tv_list = list(need_tvs)
if no_exists:
need_tv_list = list(no_exists)
for need_tmdbid in need_tv_list:
need_tv = need_tvs.get(need_tmdbid)
need_tv = no_exists.get(need_tmdbid)
if not need_tv:
continue
index = 0
for tv in need_tv:
for sea, tv in need_tv.items():
need_season = tv.season or 1
need_episodes = tv.episodes
total_episodes = tv.total_episodes
@ -270,21 +272,19 @@ class DownloadChain(ChainBase):
download_id = __download(context)
if download_id:
# 更新仍需集数
need_episodes = __update_episodes(tmdbid=need_tmdbid,
need=need_episodes,
seq=index,
current=item_episodes)
index += 1
need_episodes = __update_episodes(_tmdbid=need_tmdbid,
_need=need_episodes,
_sea=need_season,
_current=item_episodes)
# 仍然缺失的剧集从整季中选择需要的集数文件下载仅支持QB和TR
if need_tvs:
need_tv_list = list(need_tvs)
if no_exists:
need_tv_list = list(no_exists)
for need_tmdbid in need_tv_list:
need_tv = need_tvs.get(need_tmdbid)
need_tv = no_exists.get(need_tmdbid)
if not need_tv:
continue
index = 0
for tv in need_tv:
for sea, tv in need_tv.items():
need_season = tv.season or 1
need_episodes = tv.episodes
if not need_episodes:
@ -323,19 +323,18 @@ class DownloadChain(ChainBase):
if not download_id:
continue
# 更新仍需集数
need_episodes = __update_episodes(tmdbid=need_tmdbid,
need=need_episodes,
seq=index,
current=selected_episodes)
index += 1
need_episodes = __update_episodes(_tmdbid=need_tmdbid,
_need=need_episodes,
_sea=need_season,
_current=selected_episodes)
# 返回下载的资源,剩下没下完的
return downloaded_list, need_tvs
return downloaded_list, no_exists
def get_no_exists_info(self, meta: MetaBase,
mediainfo: MediaInfo,
no_exists: Dict[int, List[NotExistMediaInfo]] = None
) -> Tuple[bool, Dict[int, List[NotExistMediaInfo]]]:
no_exists: Dict[int, Dict[int, NotExistMediaInfo]] = None
) -> Tuple[bool, Dict[int, Dict[int, NotExistMediaInfo]]]:
"""
检查媒体库查询是否存在对于剧集同时返回不存在的季集信息
:param meta: 元数据
@ -355,20 +354,20 @@ class DownloadChain(ChainBase):
]}
"""
if not no_exists.get(mediainfo.tmdb_id):
no_exists[mediainfo.tmdb_id] = [
NotExistMediaInfo(
no_exists[mediainfo.tmdb_id] = {
_season: NotExistMediaInfo(
season=_season,
episodes=_episodes,
total_episodes=_total,
start_episode=_start)
]
start_episode=_start
)
}
else:
no_exists[mediainfo.tmdb_id].append(
NotExistMediaInfo(
no_exists[mediainfo.tmdb_id][_season] = NotExistMediaInfo(
season=_season,
episodes=_episodes,
total_episodes=_total,
start_episode=_start)
start_episode=_start
)
if not no_exists:

View File

@ -23,7 +23,7 @@ class SearchChain(ChainBase):
def process(self, meta: MetaBase, mediainfo: MediaInfo,
keyword: str = None,
no_exists: Dict[int, List[NotExistMediaInfo]] = None) -> Optional[List[Context]]:
no_exists: Dict[int, Dict[int, NotExistMediaInfo]] = None) -> Optional[List[Context]]:
"""
根据媒体信息执行搜索
:param meta: 元数据
@ -55,10 +55,10 @@ class SearchChain(ChainBase):
logger.error(f'媒体信息识别失败!')
return []
# 缺失的媒体信息
if no_exists:
if no_exists and no_exists.get(mediainfo.tmdb_id):
# 过滤剧集
season_episodes = {info.get('season'): info.get('episodes')
for info in no_exists.get(mediainfo.tmdb_id)}
season_episodes = {sea: info.episodes
for sea, info in no_exists[mediainfo.tmdb_id].items()}
else:
season_episodes = None
# 执行搜索

View File

@ -291,7 +291,7 @@ class SubscribeChain(ChainBase):
})
@staticmethod
def __get_subscribe_no_exits(no_exists: Dict[int, List[NotExistMediaInfo]],
def __get_subscribe_no_exits(no_exists: Dict[int, Dict[int, NotExistMediaInfo]],
tmdb_id: int,
begin_season: int,
total_episode: int,