From b181af40cd50a3b58e3d50c15e5933665f8bc661 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Mon, 19 Jun 2023 18:38:59 +0800 Subject: [PATCH] =?UTF-8?q?add=20=E5=AE=9E=E6=97=B6=E8=BF=9B=E5=BA=A6API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/apiv1.py | 3 +- app/api/endpoints/system.py | 25 +++++++ app/chain/__init__.py | 18 ++--- app/chain/search.py | 128 +++++++++++++++++++++++--------- app/chain/subscribe.py | 2 +- app/chain/transfer.py | 28 ++++++- app/helper/progress.py | 52 +++++++++++++ app/modules/indexer/__init__.py | 45 ++--------- app/schemas/types.py | 8 ++ 9 files changed, 220 insertions(+), 89 deletions(-) create mode 100644 app/api/endpoints/system.py create mode 100644 app/helper/progress.py diff --git a/app/api/apiv1.py b/app/api/apiv1.py index 9cdaadd4..c7ba7ebc 100644 --- a/app/api/apiv1.py +++ b/app/api/apiv1.py @@ -1,7 +1,7 @@ from fastapi import APIRouter from app.api.endpoints import login, user, site, message, webhook, subscribe, \ - media, douban, search, plugin, tmdb, history + media, douban, search, plugin, tmdb, history, system api_router = APIRouter() api_router.include_router(login.router, tags=["login"]) @@ -15,4 +15,5 @@ api_router.include_router(search.router, prefix="/search", tags=["search"]) api_router.include_router(douban.router, prefix="/douban", tags=["douban"]) api_router.include_router(tmdb.router, prefix="/tmdb", tags=["tmdb"]) api_router.include_router(history.router, prefix="/history", tags=["history"]) +api_router.include_router(system.router, prefix="/system", tags=["system"]) api_router.include_router(plugin.router, prefix="/plugin", tags=["plugin"]) diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py new file mode 100644 index 00000000..5e45ea0a --- /dev/null +++ b/app/api/endpoints/system.py @@ -0,0 +1,25 @@ +import asyncio +import json + +from fastapi import APIRouter +from fastapi.responses import StreamingResponse + +from app.helper.progress import ProgressHelper + +router = APIRouter() + + +@router.get("/progress/{process_type}", summary="实时进度") +async def get_progress(process_type: str): + """ + 实时获取处理进度,返回格式为SSE + """ + progress = ProgressHelper() + + async def event_generator(): + while True: + detail = progress.get(process_type) + yield 'data: %s\n\n' % json.dumps(detail) + await asyncio.sleep(0.2) + + return StreamingResponse(event_generator(), media_type="text/event-stream") diff --git a/app/chain/__init__.py b/app/chain/__init__.py index ebc4199f..c9bae441 100644 --- a/app/chain/__init__.py +++ b/app/chain/__init__.py @@ -148,24 +148,24 @@ class ChainBase(AbstractSingleton, metaclass=Singleton): """ return self.run_module("search_medias", meta=meta) - def search_torrents(self, mediainfo: Optional[MediaInfo], sites: List[CommentedMap], - keyword: str = None) -> Optional[List[TorrentInfo]]: + def search_torrents(self, mediainfo: Optional[MediaInfo], site: CommentedMap, + keyword: str = None) -> List[TorrentInfo]: """ - 搜索站点,多个站点需要多线程处理 + 搜索一个站点的种子资源 :param mediainfo: 识别的媒体信息 - :param sites: 站点列表 + :param site: 站点 :param keyword: 搜索关键词,如有按关键词搜索,否则按媒体信息名称搜索 :reutrn: 资源列表 """ - return self.run_module("search_torrents", mediainfo=mediainfo, sites=sites, keyword=keyword) + return self.run_module("search_torrents", mediainfo=mediainfo, site=site, keyword=keyword) - def refresh_torrents(self, sites: List[CommentedMap]) -> Optional[List[TorrentInfo]]: + def refresh_torrents(self, site: CommentedMap) -> List[TorrentInfo]: """ 获取站点最新一页的种子,多个站点需要多线程处理 - :param sites: 站点列表 + :param site: 站点 :reutrn: 种子资源列表 """ - return self.run_module("refresh_torrents", sites=sites) + return self.run_module("refresh_torrents", site=site) def filter_torrents(self, torrent_list: List[TorrentInfo], season_episodes: Dict[int, list] = None) -> List[TorrentInfo]: @@ -284,7 +284,7 @@ class ChainBase(AbstractSingleton, metaclass=Singleton): :return: 成功或失败 """ return self.run_module("post_torrents_message", title=title, mediainfo=mediainfo, - items=items, userid=userid) + items=items, userid=userid) def scrape_metadata(self, path: Path, mediainfo: MediaInfo) -> None: """ diff --git a/app/chain/search.py b/app/chain/search.py index f415f143..c0c2ef11 100644 --- a/app/chain/search.py +++ b/app/chain/search.py @@ -1,13 +1,18 @@ -from typing import Optional, List, Dict +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime +from typing import Dict +from typing import List, Optional from app.chain import ChainBase from app.core.config import settings -from app.core.context import Context, MediaInfo, TorrentInfo +from app.core.context import Context +from app.core.context import MediaInfo, TorrentInfo from app.core.metainfo import MetaInfo +from app.helper.progress import ProgressHelper from app.helper.sites import SitesHelper from app.log import logger from app.schemas import NotExistMediaInfo -from app.schemas.types import MediaType +from app.schemas.types import MediaType, ProgressKey from app.utils.string import StringUtils @@ -19,6 +24,7 @@ class SearchChain(ChainBase): def __init__(self): super().__init__() self.siteshelper = SitesHelper() + self.progress = ProgressHelper() def search_by_tmdbid(self, tmdbid: int, mtype: str = None) -> Optional[List[Context]]: """ @@ -32,24 +38,13 @@ class SearchChain(ChainBase): return None return self.process(mediainfo=mediainfo) - def search_by_title(self, title: str, site_ids: List[int] = None) -> List[TorrentInfo]: + def search_by_title(self, title: str) -> List[TorrentInfo]: """ 根据标题搜索资源,不识别不过滤,直接返回站点内容 """ logger.info(f'开始搜索资源,关键词:{title} ...') - # 未开启的站点不搜索 - indexer_sites = [] - for indexer in self.siteshelper.get_indexers(): - if not settings.INDEXER_SITES \ - or any([s in indexer.get("domain") for s in settings.INDEXER_SITES.split(',')]): - if site_ids and indexer.get("id") not in site_ids: - continue - indexer_sites.append(indexer) - if not indexer_sites: - logger.warn('未开启任何有效站点,无法搜索资源') - return [] # 搜索 - return self.search_torrents(mediainfo=None, sites=indexer_sites, keyword=title) + return self.__search_all_sites(keyword=title) def process(self, mediainfo: MediaInfo, keyword: str = None, @@ -61,20 +56,6 @@ class SearchChain(ChainBase): :param no_exists: 缺失的媒体信息 """ logger.info(f'开始搜索资源,关键词:{keyword or mediainfo.title} ...') - # 未开启的站点不搜索 - indexer_sites = [] - for indexer in self.siteshelper.get_indexers(): - if not settings.INDEXER_SITES \ - or any([s in indexer.get("domain") for s in settings.INDEXER_SITES.split(',')]): - # 站点流控 - state, msg = self.siteshelper.check(indexer.get("domain")) - if not state: - logger.warn(msg) - continue - indexer_sites.append(indexer) - if not indexer_sites: - logger.warn('未开启任何有效站点,无法搜索资源') - return [] # 补充媒体信息 if not mediainfo.names: mediainfo: MediaInfo = self.recognize_media(mtype=mediainfo.type, @@ -90,10 +71,9 @@ class SearchChain(ChainBase): else: season_episodes = None # 执行搜索 - torrents: List[TorrentInfo] = self.search_torrents( + torrents: List[TorrentInfo] = self.__search_all_sites( mediainfo=mediainfo, - keyword=keyword, - sites=indexer_sites + keyword=keyword ) if not torrents: logger.warn(f'{keyword or mediainfo.title} 未搜索到资源') @@ -107,11 +87,21 @@ class SearchChain(ChainBase): if not torrents: logger.warn(f'{keyword or mediainfo.title} 没有符合过滤条件的资源') return [] - # 过滤不匹配的资源 - logger.info(f'开始匹配,总 {len(torrents)} 个资源 ...') + # 匹配的资源 _match_torrents = [] + # 总数 + _total = len(torrents) + # 已处理数 + _count = 0 if mediainfo: + self.progress.start(ProgressKey.Search) + logger.info(f'开始匹配,总 {_total} 个资源 ...') + self.progress.update(value=0, text=f'开始匹配,总 {_total} 个资源 ...', key=ProgressKey.Search) for torrent in torrents: + _count += 1 + self.progress.update(value=(_count / _total) * 100, + text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...', + key=ProgressKey.Search) # 比对IMDBID if torrent.imdbid \ and mediainfo.imdb_id \ @@ -149,6 +139,10 @@ class SearchChain(ChainBase): logger.info(f'{mediainfo.title} 匹配到资源:{torrent.site_name} - {torrent.title}') _match_torrents.append(torrent) break + self.progress.update(value=100, + text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源', + key=ProgressKey.Search) + self.progress.end(ProgressKey.Search) else: _match_torrents = torrents logger.info(f"匹配完成,共匹配到 {len(_match_torrents)} 个资源") @@ -156,3 +150,67 @@ class SearchChain(ChainBase): return [Context(meta=MetaInfo(title=torrent.title, subtitle=torrent.description), mediainfo=mediainfo, torrentinfo=torrent) for torrent in _match_torrents] + + def __search_all_sites(self, mediainfo: Optional[MediaInfo] = None, + keyword: str = None) -> Optional[List[TorrentInfo]]: + """ + 多线程搜索多个站点 + :param mediainfo: 识别的媒体信息 + :param keyword: 搜索关键词,如有按关键词搜索,否则按媒体信息名称搜索 + :reutrn: 资源列表 + """ + # 未开启的站点不搜索 + indexer_sites = [] + for indexer in self.siteshelper.get_indexers(): + if not settings.INDEXER_SITES \ + or any([s in indexer.get("domain") for s in settings.INDEXER_SITES.split(',')]): + # 站点流控 + state, msg = self.siteshelper.check(indexer.get("domain")) + if not state: + logger.warn(msg) + continue + indexer_sites.append(indexer) + if not indexer_sites: + logger.warn('未开启任何有效站点,无法搜索资源') + return [] + # 开始进度 + self.progress.start(ProgressKey.Search) + # 开始计时 + start_time = datetime.now() + # 总数 + total_num = len(indexer_sites) + # 完成数 + finish_count = 0 + # 更新进度 + self.progress.update(value=0, + text=f"开始搜索,共 {total_num} 个站点 ...", + key=ProgressKey.Search) + # 多线程 + executor = ThreadPoolExecutor(max_workers=len(indexer_sites)) + all_task = [] + for site in indexer_sites: + task = executor.submit(self.search_torrents, mediainfo=mediainfo, + site=site, keyword=keyword) + all_task.append(task) + # 结果集 + results = [] + for future in as_completed(all_task): + finish_count += 1 + result = future.result() + if result: + results += result + logger.info(f"站点搜索进度:{finish_count} / {total_num}") + self.progress.update(value=finish_count / total_num * 100, + text=f"正在搜索,已完成 {finish_count} / {total_num} 个站点 ...", + key=ProgressKey.Search) + # 计算耗时 + end_time = datetime.now() + # 更新进度 + self.progress.update(value=100, + text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒", + key=ProgressKey.Search) + logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒") + # 结束进度 + self.progress.end(ProgressKey.Search) + # 返回 + return results diff --git a/app/chain/subscribe.py b/app/chain/subscribe.py index 90403f7c..85965c3a 100644 --- a/app/chain/subscribe.py +++ b/app/chain/subscribe.py @@ -237,7 +237,7 @@ class SubscribeChain(ChainBase): continue logger.info(f'开始刷新站点资源,站点:{indexer.get("name")} ...') domain = StringUtils.get_url_domain(indexer.get("domain")) - torrents: List[TorrentInfo] = self.refresh_torrents(sites=[indexer]) + torrents: List[TorrentInfo] = self.refresh_torrents(site=indexer) if torrents: self._torrents_cache[domain] = [] # 过滤种子 diff --git a/app/chain/transfer.py b/app/chain/transfer.py index cee79172..948574f5 100644 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -1,5 +1,4 @@ import re -from pathlib import Path from typing import List, Optional, Union from app.chain import ChainBase @@ -10,9 +9,10 @@ from app.core.metainfo import MetaInfo from app.db.downloadhistory_oper import DownloadHistoryOper from app.db.models.downloadhistory import DownloadHistory from app.db.transferhistory_oper import TransferHistoryOper +from app.helper.progress import ProgressHelper from app.log import logger from app.schemas import TransferInfo, TransferTorrent -from app.schemas.types import TorrentStatus, EventType, MediaType +from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey from app.utils.string import StringUtils @@ -25,6 +25,7 @@ class TransferChain(ChainBase): super().__init__() self.downloadhis = DownloadHistoryOper() self.transferhis = TransferHistoryOper() + self.progress = ProgressHelper() def process(self, arg_str: str = None, userid: Union[str, int] = None) -> bool: """ @@ -68,8 +69,20 @@ class TransferChain(ChainBase): return False logger.info(f"获取到 {len(torrents)} 个已完成的下载任务") - # 识别 + # 开始进度 + self.progress.start(ProgressKey.FileTransfer) + # 总数 + total_num = len(torrents) + # 已处理数量 + processed_num = 0 + self.progress.update(value=0, + text=f"开始转移下载任务文件,共 {total_num} 个任务 ...", + key=ProgressKey.FileTransfer) for torrent in torrents: + # 更新进度 + self.progress.update(value=processed_num / total_num * 100, + text=f"正在转移 {torrent.title} ...", + key=ProgressKey.FileTransfer) # 识别前预处理 result: Optional[tuple] = self.prepare_recognize(title=torrent.title) if result: @@ -145,7 +158,14 @@ class TransferChain(ChainBase): 'mediainfo': mediainfo, 'transferinfo': transferinfo }) - + # 计数 + processed_num += 1 + # 更新进度 + self.progress.update(value=processed_num / total_num * 100, + text=f"{torrent.title} 转移完成", + key=ProgressKey.FileTransfer) + # 结束进度 + self.progress.end(ProgressKey.FileTransfer) logger.info("下载器文件转移执行完成") return True diff --git a/app/helper/progress.py b/app/helper/progress.py new file mode 100644 index 00000000..4d1004ce --- /dev/null +++ b/app/helper/progress.py @@ -0,0 +1,52 @@ +from enum import Enum +from typing import Union, Dict + +from app.schemas.types import ProgressKey +from app.utils.singleton import Singleton + + +class ProgressHelper(metaclass=Singleton): + _process_detail: Dict[str, dict] = {} + + def __init__(self): + self._process_detail = {} + + def init_config(self): + pass + + def __reset(self, key: Union[ProgressKey, str]): + if isinstance(key, Enum): + key = key.value + self._process_detail[key] = { + "enable": False, + "value": 0, + "text": "请稍候..." + } + + def start(self, key: Union[ProgressKey, str]): + self.__reset(key) + if isinstance(key, Enum): + key = key.value + self._process_detail[key]['enable'] = True + + def end(self, key: Union[ProgressKey, str]): + if isinstance(key, Enum): + key = key.value + if not self._process_detail.get(key): + return + self._process_detail[key]['enable'] = False + + def update(self, key: Union[ProgressKey, str], value: float = None, text: str = None): + if isinstance(key, Enum): + key = key.value + if not self._process_detail.get(key, {}).get('enable'): + return + if value: + self._process_detail[key]['value'] = value + if text: + self._process_detail[key]['text'] = text + + def get(self, key: Union[ProgressKey, str]) -> dict: + if isinstance(key, Enum): + key = key.value + return self._process_detail.get(key) diff --git a/app/modules/indexer/__init__.py b/app/modules/indexer/__init__.py index ac5ba3e6..309ef09b 100644 --- a/app/modules/indexer/__init__.py +++ b/app/modules/indexer/__init__.py @@ -1,4 +1,3 @@ -from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from typing import List, Optional, Tuple, Union @@ -10,8 +9,8 @@ from app.modules import _ModuleBase from app.modules.indexer.spider import TorrentSpider from app.modules.indexer.tnode import TNodeSpider from app.modules.indexer.torrentleech import TorrentLeech -from app.utils.string import StringUtils from app.schemas.types import MediaType +from app.utils.string import StringUtils class IndexerModule(_ModuleBase): @@ -28,40 +27,8 @@ class IndexerModule(_ModuleBase): def init_setting(self) -> Tuple[str, Union[str, bool]]: return "INDEXER", "builtin" - def search_torrents(self, mediainfo: Optional[MediaInfo], sites: List[CommentedMap], - keyword: str = None) -> Optional[List[TorrentInfo]]: - """ - 搜索站点,多个站点需要多线程处理 - :param mediainfo: 识别的媒体信息 - :param sites: 站点列表 - :param keyword: 搜索关键词,如有按关键词搜索,否则按媒体信息名称搜索 - :reutrn: 资源列表 - """ - # 开始计时 - start_time = datetime.now() - # 多线程 - executor = ThreadPoolExecutor(max_workers=len(sites)) - all_task = [] - for site in sites: - task = executor.submit(self.__search, mediainfo=mediainfo, - site=site, keyword=keyword) - all_task.append(task) - results = [] - finish_count = 0 - for future in as_completed(all_task): - finish_count += 1 - result = future.result() - if result: - results += result - logger.info(f"站点搜索进度:{finish_count} / {len(all_task)}") - # 计算耗时 - end_time = datetime.now() - logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds} 秒") - # 返回 - return results - - def __search(self, mediainfo: MediaInfo, site: CommentedMap, - keyword: str = None) -> Optional[List[TorrentInfo]]: + def search_torrents(self, site: CommentedMap, mediainfo: MediaInfo = None, + keyword: str = None) -> List[TorrentInfo]: """ 搜索一个站点 :param mediainfo: 识别的媒体信息 @@ -140,10 +107,10 @@ class IndexerModule(_ModuleBase): return _spider.is_error, _spider.get_torrents() - def refresh_torrents(self, sites: List[CommentedMap]) -> Optional[List[TorrentInfo]]: + def refresh_torrents(self, site: CommentedMap) -> Optional[List[TorrentInfo]]: """ 获取站点最新一页的种子,多个站点需要多线程处理 - :param sites: 站点列表 + :param site: 站点 :reutrn: 种子资源列表 """ - return self.search_torrents(mediainfo=None, sites=sites, keyword=None) + return self.search_torrents(site=site) diff --git a/app/schemas/types.py b/app/schemas/types.py index de55ad21..8d5973c1 100644 --- a/app/schemas/types.py +++ b/app/schemas/types.py @@ -49,3 +49,11 @@ class SiteSchema(Enum): TorrentLeech = "TorrentLeech" FileList = "FileList" TNode = "TNode" + + +# 处理进度Key字典 +class ProgressKey(Enum): + # 搜索 + Search = "search" + # 转移 + FileTransfer = "filetransfer"