add 实时进度API

This commit is contained in:
jxxghp 2023-06-19 18:38:59 +08:00
parent 9d8866de47
commit b181af40cd
9 changed files with 220 additions and 89 deletions

View File

@ -1,7 +1,7 @@
from fastapi import APIRouter
from app.api.endpoints import login, user, site, message, webhook, subscribe, \
media, douban, search, plugin, tmdb, history
media, douban, search, plugin, tmdb, history, system
api_router = APIRouter()
api_router.include_router(login.router, tags=["login"])
@ -15,4 +15,5 @@ api_router.include_router(search.router, prefix="/search", tags=["search"])
api_router.include_router(douban.router, prefix="/douban", tags=["douban"])
api_router.include_router(tmdb.router, prefix="/tmdb", tags=["tmdb"])
api_router.include_router(history.router, prefix="/history", tags=["history"])
api_router.include_router(system.router, prefix="/system", tags=["system"])
api_router.include_router(plugin.router, prefix="/plugin", tags=["plugin"])

View File

@ -0,0 +1,25 @@
import asyncio
import json
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from app.helper.progress import ProgressHelper
router = APIRouter()
@router.get("/progress/{process_type}", summary="实时进度")
async def get_progress(process_type: str):
"""
实时获取处理进度返回格式为SSE
"""
progress = ProgressHelper()
async def event_generator():
while True:
detail = progress.get(process_type)
yield 'data: %s\n\n' % json.dumps(detail)
await asyncio.sleep(0.2)
return StreamingResponse(event_generator(), media_type="text/event-stream")

View File

@ -148,24 +148,24 @@ class ChainBase(AbstractSingleton, metaclass=Singleton):
"""
return self.run_module("search_medias", meta=meta)
def search_torrents(self, mediainfo: Optional[MediaInfo], sites: List[CommentedMap],
keyword: str = None) -> Optional[List[TorrentInfo]]:
def search_torrents(self, mediainfo: Optional[MediaInfo], site: CommentedMap,
keyword: str = None) -> List[TorrentInfo]:
"""
搜索站点多个站点需要多线程处理
搜索一个站点的种子资源
:param mediainfo: 识别的媒体信息
:param sites: 站点列表
:param site: 站点
:param keyword: 搜索关键词如有按关键词搜索否则按媒体信息名称搜索
:reutrn: 资源列表
"""
return self.run_module("search_torrents", mediainfo=mediainfo, sites=sites, keyword=keyword)
return self.run_module("search_torrents", mediainfo=mediainfo, site=site, keyword=keyword)
def refresh_torrents(self, sites: List[CommentedMap]) -> Optional[List[TorrentInfo]]:
def refresh_torrents(self, site: CommentedMap) -> List[TorrentInfo]:
"""
获取站点最新一页的种子多个站点需要多线程处理
:param sites: 站点列表
:param site: 站点
:reutrn: 种子资源列表
"""
return self.run_module("refresh_torrents", sites=sites)
return self.run_module("refresh_torrents", site=site)
def filter_torrents(self, torrent_list: List[TorrentInfo],
season_episodes: Dict[int, list] = None) -> List[TorrentInfo]:
@ -284,7 +284,7 @@ class ChainBase(AbstractSingleton, metaclass=Singleton):
:return: 成功或失败
"""
return self.run_module("post_torrents_message", title=title, mediainfo=mediainfo,
items=items, userid=userid)
items=items, userid=userid)
def scrape_metadata(self, path: Path, mediainfo: MediaInfo) -> None:
"""

View File

@ -1,13 +1,18 @@
from typing import Optional, List, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import Dict
from typing import List, Optional
from app.chain import ChainBase
from app.core.config import settings
from app.core.context import Context, MediaInfo, TorrentInfo
from app.core.context import Context
from app.core.context import MediaInfo, TorrentInfo
from app.core.metainfo import MetaInfo
from app.helper.progress import ProgressHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.schemas import NotExistMediaInfo
from app.schemas.types import MediaType
from app.schemas.types import MediaType, ProgressKey
from app.utils.string import StringUtils
@ -19,6 +24,7 @@ class SearchChain(ChainBase):
def __init__(self):
super().__init__()
self.siteshelper = SitesHelper()
self.progress = ProgressHelper()
def search_by_tmdbid(self, tmdbid: int, mtype: str = None) -> Optional[List[Context]]:
"""
@ -32,24 +38,13 @@ class SearchChain(ChainBase):
return None
return self.process(mediainfo=mediainfo)
def search_by_title(self, title: str, site_ids: List[int] = None) -> List[TorrentInfo]:
def search_by_title(self, title: str) -> List[TorrentInfo]:
"""
根据标题搜索资源不识别不过滤直接返回站点内容
"""
logger.info(f'开始搜索资源,关键词:{title} ...')
# 未开启的站点不搜索
indexer_sites = []
for indexer in self.siteshelper.get_indexers():
if not settings.INDEXER_SITES \
or any([s in indexer.get("domain") for s in settings.INDEXER_SITES.split(',')]):
if site_ids and indexer.get("id") not in site_ids:
continue
indexer_sites.append(indexer)
if not indexer_sites:
logger.warn('未开启任何有效站点,无法搜索资源')
return []
# 搜索
return self.search_torrents(mediainfo=None, sites=indexer_sites, keyword=title)
return self.__search_all_sites(keyword=title)
def process(self, mediainfo: MediaInfo,
keyword: str = None,
@ -61,20 +56,6 @@ class SearchChain(ChainBase):
:param no_exists: 缺失的媒体信息
"""
logger.info(f'开始搜索资源,关键词:{keyword or mediainfo.title} ...')
# 未开启的站点不搜索
indexer_sites = []
for indexer in self.siteshelper.get_indexers():
if not settings.INDEXER_SITES \
or any([s in indexer.get("domain") for s in settings.INDEXER_SITES.split(',')]):
# 站点流控
state, msg = self.siteshelper.check(indexer.get("domain"))
if not state:
logger.warn(msg)
continue
indexer_sites.append(indexer)
if not indexer_sites:
logger.warn('未开启任何有效站点,无法搜索资源')
return []
# 补充媒体信息
if not mediainfo.names:
mediainfo: MediaInfo = self.recognize_media(mtype=mediainfo.type,
@ -90,10 +71,9 @@ class SearchChain(ChainBase):
else:
season_episodes = None
# 执行搜索
torrents: List[TorrentInfo] = self.search_torrents(
torrents: List[TorrentInfo] = self.__search_all_sites(
mediainfo=mediainfo,
keyword=keyword,
sites=indexer_sites
keyword=keyword
)
if not torrents:
logger.warn(f'{keyword or mediainfo.title} 未搜索到资源')
@ -107,11 +87,21 @@ class SearchChain(ChainBase):
if not torrents:
logger.warn(f'{keyword or mediainfo.title} 没有符合过滤条件的资源')
return []
# 过滤不匹配的资源
logger.info(f'开始匹配,总 {len(torrents)} 个资源 ...')
# 匹配的资源
_match_torrents = []
# 总数
_total = len(torrents)
# 已处理数
_count = 0
if mediainfo:
self.progress.start(ProgressKey.Search)
logger.info(f'开始匹配,总 {_total} 个资源 ...')
self.progress.update(value=0, text=f'开始匹配,总 {_total} 个资源 ...', key=ProgressKey.Search)
for torrent in torrents:
_count += 1
self.progress.update(value=(_count / _total) * 100,
text=f'正在匹配 {torrent.site_name},已完成 {_count} / {_total} ...',
key=ProgressKey.Search)
# 比对IMDBID
if torrent.imdbid \
and mediainfo.imdb_id \
@ -149,6 +139,10 @@ class SearchChain(ChainBase):
logger.info(f'{mediainfo.title} 匹配到资源:{torrent.site_name} - {torrent.title}')
_match_torrents.append(torrent)
break
self.progress.update(value=100,
text=f'匹配完成,共匹配到 {len(_match_torrents)} 个资源',
key=ProgressKey.Search)
self.progress.end(ProgressKey.Search)
else:
_match_torrents = torrents
logger.info(f"匹配完成,共匹配到 {len(_match_torrents)} 个资源")
@ -156,3 +150,67 @@ class SearchChain(ChainBase):
return [Context(meta=MetaInfo(title=torrent.title, subtitle=torrent.description),
mediainfo=mediainfo,
torrentinfo=torrent) for torrent in _match_torrents]
def __search_all_sites(self, mediainfo: Optional[MediaInfo] = None,
keyword: str = None) -> Optional[List[TorrentInfo]]:
"""
多线程搜索多个站点
:param mediainfo: 识别的媒体信息
:param keyword: 搜索关键词如有按关键词搜索否则按媒体信息名称搜索
:reutrn: 资源列表
"""
# 未开启的站点不搜索
indexer_sites = []
for indexer in self.siteshelper.get_indexers():
if not settings.INDEXER_SITES \
or any([s in indexer.get("domain") for s in settings.INDEXER_SITES.split(',')]):
# 站点流控
state, msg = self.siteshelper.check(indexer.get("domain"))
if not state:
logger.warn(msg)
continue
indexer_sites.append(indexer)
if not indexer_sites:
logger.warn('未开启任何有效站点,无法搜索资源')
return []
# 开始进度
self.progress.start(ProgressKey.Search)
# 开始计时
start_time = datetime.now()
# 总数
total_num = len(indexer_sites)
# 完成数
finish_count = 0
# 更新进度
self.progress.update(value=0,
text=f"开始搜索,共 {total_num} 个站点 ...",
key=ProgressKey.Search)
# 多线程
executor = ThreadPoolExecutor(max_workers=len(indexer_sites))
all_task = []
for site in indexer_sites:
task = executor.submit(self.search_torrents, mediainfo=mediainfo,
site=site, keyword=keyword)
all_task.append(task)
# 结果集
results = []
for future in as_completed(all_task):
finish_count += 1
result = future.result()
if result:
results += result
logger.info(f"站点搜索进度:{finish_count} / {total_num}")
self.progress.update(value=finish_count / total_num * 100,
text=f"正在搜索,已完成 {finish_count} / {total_num} 个站点 ...",
key=ProgressKey.Search)
# 计算耗时
end_time = datetime.now()
# 更新进度
self.progress.update(value=100,
text=f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}",
key=ProgressKey.Search)
logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}")
# 结束进度
self.progress.end(ProgressKey.Search)
# 返回
return results

View File

@ -237,7 +237,7 @@ class SubscribeChain(ChainBase):
continue
logger.info(f'开始刷新站点资源,站点:{indexer.get("name")} ...')
domain = StringUtils.get_url_domain(indexer.get("domain"))
torrents: List[TorrentInfo] = self.refresh_torrents(sites=[indexer])
torrents: List[TorrentInfo] = self.refresh_torrents(site=indexer)
if torrents:
self._torrents_cache[domain] = []
# 过滤种子

View File

@ -1,5 +1,4 @@
import re
from pathlib import Path
from typing import List, Optional, Union
from app.chain import ChainBase
@ -10,9 +9,10 @@ from app.core.metainfo import MetaInfo
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.models.downloadhistory import DownloadHistory
from app.db.transferhistory_oper import TransferHistoryOper
from app.helper.progress import ProgressHelper
from app.log import logger
from app.schemas import TransferInfo, TransferTorrent
from app.schemas.types import TorrentStatus, EventType, MediaType
from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey
from app.utils.string import StringUtils
@ -25,6 +25,7 @@ class TransferChain(ChainBase):
super().__init__()
self.downloadhis = DownloadHistoryOper()
self.transferhis = TransferHistoryOper()
self.progress = ProgressHelper()
def process(self, arg_str: str = None, userid: Union[str, int] = None) -> bool:
"""
@ -68,8 +69,20 @@ class TransferChain(ChainBase):
return False
logger.info(f"获取到 {len(torrents)} 个已完成的下载任务")
# 识别
# 开始进度
self.progress.start(ProgressKey.FileTransfer)
# 总数
total_num = len(torrents)
# 已处理数量
processed_num = 0
self.progress.update(value=0,
text=f"开始转移下载任务文件,共 {total_num} 个任务 ...",
key=ProgressKey.FileTransfer)
for torrent in torrents:
# 更新进度
self.progress.update(value=processed_num / total_num * 100,
text=f"正在转移 {torrent.title} ...",
key=ProgressKey.FileTransfer)
# 识别前预处理
result: Optional[tuple] = self.prepare_recognize(title=torrent.title)
if result:
@ -145,7 +158,14 @@ class TransferChain(ChainBase):
'mediainfo': mediainfo,
'transferinfo': transferinfo
})
# 计数
processed_num += 1
# 更新进度
self.progress.update(value=processed_num / total_num * 100,
text=f"{torrent.title} 转移完成",
key=ProgressKey.FileTransfer)
# 结束进度
self.progress.end(ProgressKey.FileTransfer)
logger.info("下载器文件转移执行完成")
return True

52
app/helper/progress.py Normal file
View File

@ -0,0 +1,52 @@
from enum import Enum
from typing import Union, Dict
from app.schemas.types import ProgressKey
from app.utils.singleton import Singleton
class ProgressHelper(metaclass=Singleton):
_process_detail: Dict[str, dict] = {}
def __init__(self):
self._process_detail = {}
def init_config(self):
pass
def __reset(self, key: Union[ProgressKey, str]):
if isinstance(key, Enum):
key = key.value
self._process_detail[key] = {
"enable": False,
"value": 0,
"text": "请稍候..."
}
def start(self, key: Union[ProgressKey, str]):
self.__reset(key)
if isinstance(key, Enum):
key = key.value
self._process_detail[key]['enable'] = True
def end(self, key: Union[ProgressKey, str]):
if isinstance(key, Enum):
key = key.value
if not self._process_detail.get(key):
return
self._process_detail[key]['enable'] = False
def update(self, key: Union[ProgressKey, str], value: float = None, text: str = None):
if isinstance(key, Enum):
key = key.value
if not self._process_detail.get(key, {}).get('enable'):
return
if value:
self._process_detail[key]['value'] = value
if text:
self._process_detail[key]['text'] = text
def get(self, key: Union[ProgressKey, str]) -> dict:
if isinstance(key, Enum):
key = key.value
return self._process_detail.get(key)

View File

@ -1,4 +1,3 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import List, Optional, Tuple, Union
@ -10,8 +9,8 @@ from app.modules import _ModuleBase
from app.modules.indexer.spider import TorrentSpider
from app.modules.indexer.tnode import TNodeSpider
from app.modules.indexer.torrentleech import TorrentLeech
from app.utils.string import StringUtils
from app.schemas.types import MediaType
from app.utils.string import StringUtils
class IndexerModule(_ModuleBase):
@ -28,40 +27,8 @@ class IndexerModule(_ModuleBase):
def init_setting(self) -> Tuple[str, Union[str, bool]]:
return "INDEXER", "builtin"
def search_torrents(self, mediainfo: Optional[MediaInfo], sites: List[CommentedMap],
keyword: str = None) -> Optional[List[TorrentInfo]]:
"""
搜索站点多个站点需要多线程处理
:param mediainfo: 识别的媒体信息
:param sites: 站点列表
:param keyword: 搜索关键词如有按关键词搜索否则按媒体信息名称搜索
:reutrn: 资源列表
"""
# 开始计时
start_time = datetime.now()
# 多线程
executor = ThreadPoolExecutor(max_workers=len(sites))
all_task = []
for site in sites:
task = executor.submit(self.__search, mediainfo=mediainfo,
site=site, keyword=keyword)
all_task.append(task)
results = []
finish_count = 0
for future in as_completed(all_task):
finish_count += 1
result = future.result()
if result:
results += result
logger.info(f"站点搜索进度:{finish_count} / {len(all_task)}")
# 计算耗时
end_time = datetime.now()
logger.info(f"站点搜索完成,有效资源数:{len(results)},总耗时 {(end_time - start_time).seconds}")
# 返回
return results
def __search(self, mediainfo: MediaInfo, site: CommentedMap,
keyword: str = None) -> Optional[List[TorrentInfo]]:
def search_torrents(self, site: CommentedMap, mediainfo: MediaInfo = None,
keyword: str = None) -> List[TorrentInfo]:
"""
搜索一个站点
:param mediainfo: 识别的媒体信息
@ -140,10 +107,10 @@ class IndexerModule(_ModuleBase):
return _spider.is_error, _spider.get_torrents()
def refresh_torrents(self, sites: List[CommentedMap]) -> Optional[List[TorrentInfo]]:
def refresh_torrents(self, site: CommentedMap) -> Optional[List[TorrentInfo]]:
"""
获取站点最新一页的种子多个站点需要多线程处理
:param sites: 站点列表
:param site: 站点
:reutrn: 种子资源列表
"""
return self.search_torrents(mediainfo=None, sites=sites, keyword=None)
return self.search_torrents(site=site)

View File

@ -49,3 +49,11 @@ class SiteSchema(Enum):
TorrentLeech = "TorrentLeech"
FileList = "FileList"
TNode = "TNode"
# 处理进度Key字典
class ProgressKey(Enum):
# 搜索
Search = "search"
# 转移
FileTransfer = "filetransfer"