2023-08-30 16:14:21 +08:00

321 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import re
from datetime import datetime
from typing import Tuple, Optional
from sqlalchemy.orm import Session
from app.chain import ChainBase
from app.chain.download import DownloadChain
from app.core.config import settings
from app.core.context import Context, TorrentInfo, MediaInfo
from app.core.metainfo import MetaInfo
from app.db.rss_oper import RssOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.message import MessageHelper
from app.helper.rss import RssHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.schemas import Notification, NotExistMediaInfo
from app.schemas.types import SystemConfigKey, MediaType, NotificationType
from app.utils.string import StringUtils
class RssChain(ChainBase):
"""
RSS处理链
"""
def __init__(self, db: Session = None):
super().__init__(db)
self.rssoper = RssOper(self._db)
self.sites = SitesHelper()
self.systemconfig = SystemConfigOper(self._db)
self.downloadchain = DownloadChain(self._db)
self.message = MessageHelper()
def add(self, title: str, year: str,
mtype: MediaType = None,
season: int = None,
**kwargs) -> Tuple[Optional[int], str]:
"""
识别媒体信息并添加订阅
"""
logger.info(f'开始添加自定义订阅,标题:{title} ...')
# 识别元数据
metainfo = MetaInfo(title)
if year:
metainfo.year = year
if mtype:
metainfo.type = mtype
if season:
metainfo.type = MediaType.TV
metainfo.begin_season = season
# 识别媒体信息
mediainfo: MediaInfo = self.recognize_media(meta=metainfo)
if not mediainfo:
logger.warn(f'{title} 未识别到媒体信息')
return None, "未识别到媒体信息"
# 更新媒体图片
self.obtain_images(mediainfo=mediainfo)
# 总集数
if mediainfo.type == MediaType.TV:
if not season:
season = 1
# 总集数
if not kwargs.get('total_episode'):
if not mediainfo.seasons:
# 补充媒体信息
mediainfo: MediaInfo = self.recognize_media(mtype=mediainfo.type,
tmdbid=mediainfo.tmdb_id)
if not mediainfo:
logger.error(f"媒体信息识别失败!")
return None, "媒体信息识别失败"
if not mediainfo.seasons:
logger.error(f"{title} 媒体信息中没有季集信息")
return None, "媒体信息中没有季集信息"
total_episode = len(mediainfo.seasons.get(season) or [])
if not total_episode:
logger.error(f'{title} 未获取到总集数')
return None, "未获取到总集数"
kwargs.update({
'total_episode': total_episode
})
# 检查是否存在
if self.rssoper.exists(tmdbid=mediainfo.tmdb_id, season=season):
logger.warn(f'{mediainfo.title} 已存在')
return None, f'{mediainfo.title} 自定义订阅已存在'
if not kwargs.get("name"):
kwargs.update({
"name": mediainfo.title
})
kwargs.update({
"tmdbid": mediainfo.tmdb_id,
"poster": mediainfo.get_poster_image(),
"backdrop": mediainfo.get_backdrop_image(),
"vote": mediainfo.vote_average,
"description": mediainfo.overview,
})
# 添加订阅
sid = self.rssoper.add(title=title, year=year, season=season, **kwargs)
if not sid:
logger.error(f'{mediainfo.title_year} 添加自定义订阅失败')
return None, "添加自定义订阅失败"
else:
logger.info(f'{mediainfo.title_year} {metainfo.season} 添加订阅成功')
# 返回结果
return sid, ""
def refresh(self, rssid: int = None, manual: bool = False):
"""
刷新RSS订阅数据
"""
# 所有RSS订阅
logger.info("开始刷新RSS订阅数据 ...")
rss_tasks = self.rssoper.list(rssid) or []
for rss_task in rss_tasks:
if not rss_task:
continue
if not rss_task.url:
continue
# 下载Rss报文
items = RssHelper.parse(rss_task.url, True if rss_task.proxy else False)
if not items:
logger.error(f"RSS未下载到数据{rss_task.url}")
logger.info(f"{rss_task.name} RSS下载到数据{len(items)}")
# 检查站点
domain = StringUtils.get_url_domain(rss_task.url)
site_info = self.sites.get_indexer(domain) or {}
# 过滤规则
if rss_task.best_version:
filter_rule = self.systemconfig.get(SystemConfigKey.FilterRules2)
else:
filter_rule = self.systemconfig.get(SystemConfigKey.FilterRules)
# 处理RSS条目
matched_contexts = []
# 处理过的title
processed_data = json.loads(rss_task.note) if rss_task.note else {
"titles": [],
"season_episodes": []
}
for item in items:
if not item.get("title"):
continue
# 标题是否已处理过
if item.get("title") in processed_data.get('titles'):
logger.info(f"{item.get('title')} 已处理过")
continue
# 基本要素匹配
if rss_task.include \
and not re.search(r"%s" % rss_task.include, item.get("title")):
logger.info(f"{item.get('title')} 未包含 {rss_task.include}")
continue
if rss_task.exclude \
and re.search(r"%s" % rss_task.exclude, item.get("title")):
logger.info(f"{item.get('title')} 包含 {rss_task.exclude}")
continue
# 识别媒体信息
meta = MetaInfo(title=item.get("title"), subtitle=item.get("description"))
if not meta.name:
logger.error(f"{item.get('title')} 未识别到有效信息")
continue
mediainfo = self.recognize_media(meta=meta)
if not mediainfo:
logger.error(f"{item.get('title')} 未识别到TMDB媒体信息")
continue
if mediainfo.tmdb_id != rss_task.tmdbid:
logger.error(f"{item.get('title')} 不匹配")
continue
# 季集是否已处理过
if meta.season_episode in processed_data.get('season_episodes'):
logger.info(f"{meta.org_string} {meta.season_episode} 已处理过")
continue
# 种子
torrentinfo = TorrentInfo(
site=site_info.get("id"),
site_name=site_info.get("name"),
site_cookie=site_info.get("cookie"),
site_ua=site_info.get("cookie") or settings.USER_AGENT,
site_proxy=site_info.get("proxy") or rss_task.proxy,
site_order=site_info.get("pri"),
title=item.get("title"),
description=item.get("description"),
enclosure=item.get("enclosure"),
page_url=item.get("link"),
size=item.get("size"),
pubdate=item["pubdate"].strftime("%Y-%m-%d %H:%M:%S") if item.get("pubdate") else None,
)
# 过滤种子
if rss_task.filter:
result = self.filter_torrents(
rule_string=filter_rule,
torrent_list=[torrentinfo]
)
if not result:
logger.info(f"{rss_task.name} 不匹配过滤规则")
continue
# 清除多余数据
mediainfo.clear()
# 匹配到的数据
matched_contexts.append(Context(
meta_info=meta,
media_info=mediainfo,
torrent_info=torrentinfo
))
# 匹配结果
if not matched_contexts:
logger.info(f"{rss_task.name} 未匹配到数据")
continue
logger.info(f"{rss_task.name} 匹配到 {len(matched_contexts)} 条数据")
# 查询本地存在情况
if not rss_task.best_version:
# 查询缺失的媒体信息
rss_meta = MetaInfo(title=rss_task.title)
rss_meta.year = rss_task.year
rss_meta.begin_season = rss_task.season
rss_meta.type = MediaType(rss_task.type)
# 每季总集数
totals = {}
if rss_task.season and rss_task.total_episode:
totals = {
rss_task.season: rss_task.total_episode
}
# 检查缺失
exist_flag, no_exists = self.downloadchain.get_no_exists_info(
meta=rss_meta,
mediainfo=MediaInfo(
title=rss_task.title,
year=rss_task.year,
tmdb_id=rss_task.tmdbid,
season=rss_task.season
),
totals=totals
)
if exist_flag:
logger.info(f'{rss_task.name} 媒体库中已存在,完成订阅')
self.rssoper.delete(rss_task.id)
# 发送通知
self.post_message(Notification(mtype=NotificationType.Subscribe,
title=f'自定义订阅 {rss_task.name} 已完成',
image=rss_task.backdrop))
continue
elif rss_meta.type == MediaType.TV.value:
# 打印缺失集信息
if no_exists and no_exists.get(rss_task.tmdbid):
no_exists_info = no_exists.get(rss_task.tmdbid).get(rss_task.season)
if no_exists_info:
logger.info(f'订阅 {rss_task.name} 缺失集:{no_exists_info.episodes}')
else:
if rss_task.type == MediaType.TV.value:
no_exists = {
rss_task.season: NotExistMediaInfo(
season=rss_task.season,
episodes=[],
total_episode=rss_task.total_episode,
start_episode=1)
}
else:
no_exists = {}
# 开始下载
downloads, lefts = self.downloadchain.batch_download(contexts=matched_contexts,
no_exists=no_exists,
save_path=rss_task.save_path)
if downloads and not lefts:
if not rss_task.best_version:
# 非洗版结束订阅
self.rssoper.delete(rss_task.id)
# 发送通知
self.post_message(Notification(mtype=NotificationType.Subscribe,
title=f'自定义订阅 {rss_task.name} 已完成',
image=rss_task.backdrop))
else:
# 未完成下载
logger.info(f'{rss_task.name} 未下载未完整,继续订阅 ...')
if downloads:
for download in downloads:
meta = download.meta_info
# 更新已处理数据
processed_data['titles'].append(meta.org_string)
processed_data['season_episodes'].append(meta.season_episode)
# 更新已处理过的数据
self.rssoper.update(rssid=rss_task.id, note=json.dumps(processed_data))
# 更新最后更新时间和已处理数量
self.rssoper.update(rssid=rss_task.id,
processed=(rss_task.processed or 0) + len(downloads),
last_update=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
logger.info("刷新RSS订阅数据完成")
if manual:
if len(rss_tasks) == 1:
self.message.put(f"{rss_tasks[0].name} 自定义订阅刷新完成")
else:
self.message.put(f"自定义订阅刷新完成")