2024-05-20 17:41:46 +08:00

149 lines
4.9 KiB
Python

from typing import Tuple, List
from ruamel.yaml import CommentedMap
from app.core.config import settings
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas import MediaType
from app.utils.http import RequestUtils
from app.utils.string import StringUtils
class YemaSpider:
"""
YemaPT API
"""
_indexerid = None
_domain = None
_name = ""
_proxy = None
_cookie = None
_ua = None
_size = 40
_searchurl = "%sapi/torrent/fetchCategoryOpenTorrentList"
_downloadurl = "%sapi/torrent/download?id=%s"
_pageurl = "%s#/torrent/detail/%s/"
_timeout = 15
# 分类
_movie_category = 4
_tv_category = 5
def __init__(self, indexer: CommentedMap):
self.systemconfig = SystemConfigOper()
if indexer:
self._indexerid = indexer.get('id')
self._domain = indexer.get('domain')
self._searchurl = self._searchurl % self._domain
self._name = indexer.get('name')
if indexer.get('proxy'):
self._proxy = settings.PROXY
self._cookie = indexer.get('cookie')
self._ua = indexer.get('ua')
self._timeout = indexer.get('timeout') or 15
def search(self, keyword: str, mtype: MediaType = None, page: int = 0) -> Tuple[bool, List[dict]]:
"""
搜索
"""
if not mtype:
categoryId = self._movie_category
elif mtype == MediaType.TV:
categoryId = self._tv_category
else:
categoryId = self._movie_category
params = {
"categoryId": categoryId,
"pageParam": {
"current": page + 1,
"pageSize": self._size,
"total": self._size
},
"sorter": {}
}
if keyword:
params.update({
"keyword": keyword,
})
res = RequestUtils(
headers={
"Content-Type": "application/json",
"User-Agent": f"{self._ua}",
"Accept": "application/json, text/plain, */*"
},
cookies=self._cookie,
proxies=self._proxy,
referer=f"{self._domain}",
timeout=self._timeout
).post_res(url=self._searchurl, json=params)
torrents = []
if res and res.status_code == 200:
results = res.json().get('data', []) or []
for result in results:
category_value = result.get('categoryId')
if category_value == self._tv_category:
category = MediaType.TV.value
elif category_value == self._movie_category:
category = MediaType.MOVIE.value
else:
category = MediaType.UNKNOWN.value
torrent = {
'title': result.get('showName'),
'description': result.get('shortDesc'),
'enclosure': self.__get_download_url(result.get('id')),
'pubdate': StringUtils.unify_datetime_str(result.get('gmtCreate')),
'size': result.get('fileSize'),
'seeders': result.get('seedNum'),
'peers': result.get('leechNum'),
'grabs': result.get('completedNum'),
'downloadvolumefactor': self.__get_downloadvolumefactor(result.get('downloadPromotion')),
'uploadvolumefactor': self.__get_uploadvolumefactor(result.get('uploadPromotion')),
'freedate': StringUtils.unify_datetime_str(result.get('downloadPromotionEndTime')),
'page_url': self._pageurl % (self._domain, result.get('id')),
'labels': [],
'category': category
}
torrents.append(torrent)
elif res is not None:
logger.warn(f"{self._name} 搜索失败,错误码:{res.status_code}")
return True, []
else:
logger.warn(f"{self._name} 搜索失败,无法连接 {self._domain}")
return True, []
return False, torrents
@staticmethod
def __get_downloadvolumefactor(discount: str) -> float:
"""
获取下载系数
"""
discount_dict = {
"free": 0,
"half": 0.5,
"none": 1
}
if discount:
return discount_dict.get(discount, 1)
return 1
@staticmethod
def __get_uploadvolumefactor(discount: str) -> float:
"""
获取上传系数
"""
discount_dict = {
"none": 1,
"one_half": 1.5,
"double_upload": 2
}
if discount:
return discount_dict.get(discount, 1)
return 1
def __get_download_url(self, torrent_id: str) -> str:
"""
获取下载链接
"""
return self._downloadurl % (self._domain, torrent_id)