This commit is contained in:
jxxghp
2023-08-08 21:32:11 +08:00
parent cb9bc2da78
commit 12d93ca083
7 changed files with 376 additions and 4 deletions

68
app/api/endpoints/rss.py Normal file
View File

@ -0,0 +1,68 @@
from typing import List, Any
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app import schemas
from app.core.security import verify_token
from app.db import get_db
from app.db.models.rss import Rss
router = APIRouter()
@router.get("/", summary="所有自定义订阅", response_model=List[schemas.Rss])
def read_rsses(
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询所有自定义订阅
"""
return Rss.list(db)
@router.post("/", summary="新增自定义订阅", response_model=schemas.Response)
def create_rss(
*,
db: Session = Depends(get_db),
rss_in: schemas.Rss,
_: schemas.TokenPayload = Depends(verify_token)
) -> Any:
"""
新增自定义订阅
"""
rss = Rss.get_by_tmdbid(db, tmdbid=rss_in.tmdbid, season=rss_in.season)
if rss:
return schemas.Response(success=False, message="自定义订阅已存在")
rss = Rss(**rss_in.dict())
rss.create(db)
return schemas.Response(success=True)
@router.put("/", summary="更新自定义订阅", response_model=schemas.Response)
def update_rss(
*,
rss_in: schemas.Rss,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)
) -> Any:
"""
更新自定义订阅信息
"""
rss = Rss.get(db, rss_in.id)
if not rss:
return schemas.Response(success=False, message="自定义订阅不存在")
rss.update(db, rss_in.dict())
return schemas.Response(success=True)
@router.get("/{rssid}", summary="查询订阅详情", response_model=schemas.Rss)
def read_rss(
rssid: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
根据ID查询自定义订阅详情
"""
return Rss.get(db, rssid)

155
app/chain/rss.py Normal file
View File

@ -0,0 +1,155 @@
import re
import time
from app.chain import ChainBase
from app.chain.download import DownloadChain
from app.core.config import settings
from app.core.context import Context, TorrentInfo, MediaInfo
from app.core.metainfo import MetaInfo
from app.db.rss_oper import RssOper
from app.db.systemconfig_oper import SystemConfigOper
from app.helper.rss import RssHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.schemas import Notification
from app.schemas.types import SystemConfigKey, MediaType, NotificationType
from app.utils.string import StringUtils
class RssChain(ChainBase):
"""
RSS处理链
"""
def __init__(self):
super().__init__()
self.rssoper = RssOper()
self.sites = SitesHelper()
self.systemconfig = SystemConfigOper()
self.downloadchain = DownloadChain()
def refresh(self):
"""
刷新RSS订阅数据
"""
# 所有RSS订阅
logger.info("开始刷新RSS订阅数据 ...")
rss_tasks = self.rssoper.list() or []
for rss_task in rss_tasks:
if not rss_task.url:
continue
# 下载Rss报文
items = RssHelper.parse(rss_task.url, True if rss_task.proxy else False)
if not items:
logger.error(f"RSS未下载到数据{rss_task.url}")
logger.info(f"{rss_task.name} RSS下载到数据{len(items)}")
# 过滤规则
if rss_task.best_version:
filter_rule = self.systemconfig.get(SystemConfigKey.FilterRules2)
else:
filter_rule = self.systemconfig.get(SystemConfigKey.FilterRules)
# 处理RSS条目
matched_contexts = []
for item in items:
if not item.get("title"):
continue
# 基本要素匹配
if rss_task.include \
and not re.search(r"%s" % rss_task.include, item.get("title")):
logger.info(f"{item.get('title')} 未包含 {rss_task.include}")
continue
if rss_task.exclude \
and re.search(r"%s" % rss_task.exclude, item.get("title")):
logger.info(f"{item.get('title')} 包含 {rss_task.exclude}")
continue
# 检查站点
domain = StringUtils.get_url_domain(item.get("enclosure"))
site_info = self.sites.get_indexer(domain)
if not site_info:
logger.error(f"{item.get('title')} 没有维护对应站点")
continue
# 识别媒体信息
meta = MetaInfo(title=item.get("title"), subtitle=item.get("description"))
if not meta.name:
logger.error(f"{item.get('title')} 未识别到有效信息")
continue
mediainfo = self.recognize_media(meta=meta)
if not mediainfo:
logger.error(f"{item.get('title')} 未识别到TMDB媒体信息")
continue
if mediainfo.tmdb_id != rss_task.tmdbid:
logger.error(f"{item.get('title')} 不匹配")
continue
# 种子
torrentinfo = TorrentInfo(
site=site_info.get("id"),
site_name=site_info.get("name"),
site_cookie=site_info.get("cookie"),
site_ua=site_info.get("cookie") or settings.USER_AGENT,
site_proxy=site_info.get("proxy"),
site_order=site_info.get("pri"),
title=item.get("title"),
description=item.get("description"),
enclosure=item.get("enclosure"),
page_url=item.get("link"),
size=item.get("size"),
pubdate=time.strftime("%Y-%m-%d %H:%M:%S", item.get("pubdate")) if item.get("pubdate") else None,
)
# 过滤种子
result = self.filter_torrents(
rule_string=filter_rule,
torrent_list=[torrentinfo]
)
if not result:
logger.info(f"{rss_task.name} 不匹配过滤规则")
continue
# 匹配
mediainfo.clear()
matched_contexts.append(Context(
meta_info=meta,
media_info=mediainfo,
torrent_info=torrentinfo
))
# 过滤规则
if not matched_contexts:
logger.info(f"{rss_task.name} 未匹配到数据")
continue
logger.info(f"{rss_task.name} 匹配到 {len(matched_contexts)} 条数据")
# 查询本地存在情况
if not rss_task.best_version:
# 查询缺失的媒体信息
rss_meta = MetaInfo(title=rss_task.title)
rss_meta.type = MediaType(rss_task.type)
exist_flag, no_exists = self.downloadchain.get_no_exists_info(
meta=rss_meta,
mediainfo=MediaInfo(
title=rss_task.title,
year=rss_task.year,
tmdb_id=rss_task.tmdbid,
season=rss_task.season
),
)
if exist_flag:
logger.info(f'{rss_task.name} 媒体库中已存在,完成订阅')
self.rssoper.delete(rss_task.id)
# 发送通知
self.post_message(Notification(mtype=NotificationType.Subscribe,
title=f'自定义订阅 {rss_task.name} 已完成',
image=rss_task.backdrop))
continue
else:
no_exists = {}
# 开始下载
downloads, lefts = self.downloadchain.batch_download(contexts=matched_contexts,
no_exists=no_exists)
if downloads and not lefts:
if not rss_task.best_version:
self.rssoper.delete(rss_task.id)
# 发送通知
self.post_message(Notification(mtype=NotificationType.Subscribe,
title=f'自定义订阅 {rss_task.name} 已完成',
image=rss_task.backdrop))
# 未完成下载
logger.info(f'{rss_task.name} 未下载未完整,继续订阅 ...')
logger.info("刷新RSS订阅数据完成")

View File

@ -414,15 +414,15 @@ class SubscribeChain(ChainBase):
else:
logger.info(f'{indexer.get("name")} 获取到种子')
# 从缓存中匹配订阅
self.__match(torrents_cache)
self.match(torrents_cache)
# 保存缓存到本地
self.save_cache(torrents_cache, self._cache_file)
def __match(self, torrents_cache: Dict[str, List[Context]]):
def match(self, torrents: Dict[str, List[Context]]):
"""
从缓存中匹配订阅,并自动下载
"""
if not torrents_cache:
if not torrents:
logger.warn('没有缓存资源,无法匹配订阅')
return
# 所有订阅
@ -482,7 +482,7 @@ class SubscribeChain(ChainBase):
no_exists = {}
# 遍历缓存种子
_match_context = []
for domain, contexts in torrents_cache.items():
for domain, contexts in torrents.items():
for context in contexts:
# 检查是否匹配
torrent_meta = context.meta_info

60
app/db/models/rss.py Normal file
View File

@ -0,0 +1,60 @@
from sqlalchemy import Column, Integer, String, Sequence
from sqlalchemy.orm import Session
from app.db.models import Base
class Rss(Base):
"""
RSS订阅
"""
id = Column(Integer, Sequence('id'), primary_key=True, index=True)
# 名称
name = Column(String, nullable=False)
# RSS地址
url = Column(String, nullable=False)
# 类型
type = Column(String)
# 标题
title = Column(String)
# 年份
year = Column(String)
# TMDBID
tmdbid = Column(Integer, index=True)
# 季号
season = Column(Integer)
# 海报
poster = Column(String)
# 背景图
backdrop = Column(String)
# 评分
vote = Column(Integer)
# 简介
description = Column(String)
# 包含
include = Column(String)
# 排除
exclude = Column(String)
# 洗版
best_version = Column(Integer)
# 是否使用代理服务器
proxy = Column(Integer)
# 保存路径
save_path = Column(String)
# 附加信息,已处理数据
note = Column(String)
# 最后更新时间
last_update = Column(String)
# 状态 0-停用1-启用
state = Column(Integer)
@staticmethod
def get_by_tmdbid(db: Session, tmdbid: int, season: int = None):
if season:
return db.query(Rss).filter(Rss.tmdbid == tmdbid,
Rss.season == season).all()
return db.query(Rss).filter(Rss.tmdbid == tmdbid).all()
@staticmethod
def get_by_title(db: Session, title: str):
return db.query(Rss).filter(Rss.title == title).first()

40
app/db/rss_oper.py Normal file
View File

@ -0,0 +1,40 @@
from typing import List
from app.db import DbOper, SessionLocal
from app.db.models.rss import Rss
class RssOper(DbOper):
"""
RSS订阅数据管理
"""
def __init__(self, db=SessionLocal()):
super().__init__(db)
def add(self, **kwargs) -> bool:
"""
新增RSS订阅
"""
item = Rss(**kwargs)
if not item.get_by_tmdbid(self._db, tmdbid=kwargs.get("tmdbid"),
season=kwargs.get("season")):
item.create(self._db)
return True
return False
def list(self) -> List[Rss]:
"""
查询所有RSS订阅
"""
return Rss.list(self._db)
def delete(self, rssid: int) -> bool:
"""
删除RSS订阅
"""
item = Rss.get(self._db, rssid)
if item:
item.delete(self._db)
return True
return False

View File

@ -12,3 +12,4 @@ from .mediaserver import *
from .message import *
from .tmdb import *
from .transfer import *
from .rss import *

48
app/schemas/rss.py Normal file
View File

@ -0,0 +1,48 @@
from typing import Optional
from pydantic import BaseModel
class Rss(BaseModel):
id: Optional[int]
# 名称
name: Optional[str]
# RSS地址
url: Optional[str]
# 类型
type: Optional[str]
# 标题
title: Optional[str]
# 年份
year: Optional[str]
# TMDBID
tmdbid: Optional[int]
# 季号
season: Optional[int]
# 海报
poster: Optional[str]
# 背景图
backdrop: Optional[str]
# 评分
vote: Optional[float]
# 简介
description: Optional[str]
# 包含
include: Optional[str]
# 排除
exclude: Optional[str]
# 洗版
best_version: Optional[int]
# 是否使用代理服务器
proxy: Optional[int]
# 保存路径
save_path: Optional[str]
# 附加信息
note: Optional[str]
# 最后更新时间
last_update: Optional[str]
# 状态 0-停用1-启用
state: Optional[int]
class Config:
orm_mode = True