145 lines
4.8 KiB
Python
145 lines
4.8 KiB
Python
import base64
|
|
import json
|
|
import re
|
|
from typing import Tuple, List
|
|
|
|
from ruamel.yaml import CommentedMap
|
|
|
|
from app.core.config import settings
|
|
from app.log import logger
|
|
from app.schemas import MediaType
|
|
from app.utils.http import RequestUtils
|
|
from app.utils.string import StringUtils
|
|
|
|
|
|
class MTorrentSpider:
|
|
_indexerid = None
|
|
_domain = None
|
|
_name = ""
|
|
_proxy = None
|
|
_cookie = None
|
|
_ua = None
|
|
_size = 100
|
|
_searchurl = "%sapi/torrent/search"
|
|
_downloadurl = "%sapi/torrent/genDlToken"
|
|
_pageurl = "%sdetail/%s"
|
|
|
|
# 电影分类
|
|
_movie_category = ['401', '419', '420', '421', '439', '405', '404']
|
|
_tv_category = ['403', '402', '435', '438', '404', '405']
|
|
|
|
# 标签
|
|
_labels = {
|
|
0: "",
|
|
4: "中字",
|
|
6: "国配",
|
|
}
|
|
|
|
def __init__(self, indexer: CommentedMap):
|
|
if indexer:
|
|
self._indexerid = indexer.get('id')
|
|
self._domain = indexer.get('domain')
|
|
self._searchurl = self._searchurl % self._domain
|
|
self._name = indexer.get('name')
|
|
if indexer.get('proxy'):
|
|
self._proxy = settings.PROXY
|
|
self._cookie = indexer.get('cookie')
|
|
self._ua = indexer.get('ua')
|
|
|
|
def search(self, keyword: str, mtype: MediaType = None, page: int = 0) -> Tuple[bool, List[dict]]:
|
|
if not mtype:
|
|
categories = []
|
|
elif mtype == MediaType.TV:
|
|
categories = self._tv_category
|
|
else:
|
|
categories = self._movie_category
|
|
params = {
|
|
"keyword": keyword,
|
|
"categories": categories,
|
|
"pageNumber": int(page) + 1,
|
|
"pageSize": self._size,
|
|
"visible": 1
|
|
}
|
|
res = RequestUtils(
|
|
headers={
|
|
"Content-Type": "application/json",
|
|
"User-Agent": f"{self._ua}"
|
|
},
|
|
cookies=self._cookie,
|
|
proxies=self._proxy,
|
|
referer=f"{self._domain}browse",
|
|
timeout=30
|
|
).post_res(url=self._searchurl, json=params)
|
|
torrents = []
|
|
if res and res.status_code == 200:
|
|
results = res.json().get('data', {}).get("data") or []
|
|
for result in results:
|
|
torrent = {
|
|
'title': result.get('name'),
|
|
'description': result.get('smallDescr'),
|
|
'enclosure': self.__get_download_url(result.get('id')),
|
|
'pubdate': StringUtils.format_timestamp(result.get('createdDate')),
|
|
'size': result.get('size'),
|
|
'seeders': result.get('status', {}).get("seeders"),
|
|
'peers': result.get('status', {}).get("leechers"),
|
|
'grabs': result.get('status', {}).get("timesCompleted"),
|
|
'downloadvolumefactor': self.__get_downloadvolumefactor(result.get('status', {}).get("discount")),
|
|
'uploadvolumefactor': self.__get_uploadvolumefactor(result.get('status', {}).get("discount")),
|
|
'page_url': self._pageurl % (self._domain, result.get('id')),
|
|
'imdbid': self.__find_imdbid(result.get('imdb')),
|
|
'labels': [self._labels.get(result.get('labels') or 0)] if result.get('labels') else []
|
|
}
|
|
torrents.append(torrent)
|
|
elif res is not None:
|
|
logger.warn(f"{self._name} 搜索失败,错误码:{res.status_code}")
|
|
return True, []
|
|
else:
|
|
logger.warn(f"{self._name} 搜索失败,无法连接 {self._domain}")
|
|
return True, []
|
|
return False, torrents
|
|
|
|
@staticmethod
|
|
def __find_imdbid(imdb: str) -> str:
|
|
if imdb:
|
|
m = re.search(r"tt\d+", imdb)
|
|
if m:
|
|
return m.group(0)
|
|
return ""
|
|
|
|
@staticmethod
|
|
def __get_downloadvolumefactor(discount: str) -> float:
|
|
discount_dict = {
|
|
"FREE": 0,
|
|
"PERCENT_50": 0.5,
|
|
"PERCENT_70": 0.3,
|
|
"_2X_FREE": 0,
|
|
"_2X_PERCENT_50": 0.5
|
|
}
|
|
if discount:
|
|
return discount_dict.get(discount, 1)
|
|
return 1
|
|
|
|
@staticmethod
|
|
def __get_uploadvolumefactor(discount: str) -> float:
|
|
uploadvolumefactor_dict = {
|
|
"_2X": 2.0,
|
|
"_2X_FREE": 2.0,
|
|
"_2X_PERCENT_50": 2.0
|
|
}
|
|
if discount:
|
|
return uploadvolumefactor_dict.get(discount, 1)
|
|
return 1
|
|
|
|
def __get_download_url(self, torrent_id: str) -> str:
|
|
url = self._downloadurl % self._domain
|
|
params = {
|
|
'method': 'post',
|
|
'params': {
|
|
'id': torrent_id
|
|
},
|
|
'result': 'data'
|
|
}
|
|
# base64编码
|
|
base64_str = base64.b64encode(json.dumps(params).encode('utf-8')).decode('utf-8')
|
|
return f"[{base64_str}]{url}"
|