这是您的第237次签到, + # 已连续签到237天。
本次签到获得300克猫粮。
"} + logger.info(f"签到成功") + return True, f'【{site}】签到成功' + else: + # {"status":"0","data":"抱歉","message":"您今天已经签到过了,请勿重复刷新。"} + logger.info(f"今日已签到") + return True, f'【{site}】今日已签到' diff --git a/app/plugins/autosignin/sites/tjupt.py b/app/plugins/autosignin/sites/tjupt.py new file mode 100644 index 00000000..f66eed91 --- /dev/null +++ b/app/plugins/autosignin/sites/tjupt.py @@ -0,0 +1,272 @@ +import json +import os +import time +from io import BytesIO +from typing import Tuple + +from PIL import Image +from lxml import etree +from ruamel.yaml import CommentedMap + +from app.core import settings +from app.log import logger +from app.plugins.autosignin.sites import _ISiteSigninHandler +from app.utils.http import RequestUtils +from app.utils.string import StringUtils + + +class Tjupt(_ISiteSigninHandler): + """ + 北洋签到 + """ + # 匹配的站点Url,每一个实现类都需要设置为自己的站点Url + site_url = "tjupt.org" + + # 签到地址 + _sign_in_url = 'https://www.tjupt.org/attendance.php' + + # 已签到 + _sign_regex = ['今日已签到'] + + # 签到成功 + _succeed_regex = ['这是您的首次签到,本次签到获得\\d+个魔力值。', + '签到成功,这是您的第\\d+次签到,已连续签到\\d+天,本次签到获得\\d+个魔力值。', + '重新签到成功,本次签到获得\\d+个魔力值'] + + # 存储正确的答案,后续可直接查 + _answer_path = settings.TEMP_PATH / "signin/" + _answer_file = _answer_path / "tjupt.json" + + @classmethod + def match(cls, url: str) -> bool: + """ + 根据站点Url判断是否匹配当前站点签到类,大部分情况使用默认实现即可 + :param url: 站点Url + :return: 是否匹配,如匹配则会调用该类的signin方法 + """ + return True if StringUtils.url_equal(url, cls.site_url) else False + + def signin(self, site_info: CommentedMap) -> Tuple[bool, str]: + """ + 执行签到操作 + :param site_info: 站点信息,含有站点Url、站点Cookie、UA等信息 + :return: 签到结果信息 + """ + site = site_info.get("name") + site_cookie = site_info.get("cookie") + ua = site_info.get("ua") + proxy = settings.PROXY if site_info.get("proxy") else None + + # 创建正确答案存储目录 + if not os.path.exists(os.path.dirname(self._answer_file)): + os.makedirs(os.path.dirname(self._answer_file)) + + # 获取北洋签到页面html + html_res = RequestUtils(cookies=site_cookie, + headers=ua, + proxies=proxy + ).get_res(url=self._sign_in_url) + + # 获取签到后返回html,判断是否签到成功 + if not html_res or html_res.status_code != 200: + logger.error(f"签到失败,请检查站点连通性") + return False, f'【{site}】签到失败,请检查站点连通性' + + if "login.php" in html_res.text: + logger.error(f"签到失败,cookie失效") + return False, f'【{site}】签到失败,cookie失效' + + sign_status = self.sign_in_result(html_res=html_res.text, + regexs=self._sign_regex) + if sign_status: + logger.info(f"今日已签到") + return True, f'【{site}】今日已签到' + + # 没有签到则解析html + html = etree.HTML(html_res.text) + if not html: + return False, f'【{site}】签到失败' + img_url = html.xpath('//table[@class="captcha"]//img/@src')[0] + + if not img_url: + logger.error(f"签到失败,未获取到签到图片") + return False, f'【{site}】签到失败,未获取到签到图片' + + # 签到图片 + img_url = "https://www.tjupt.org" + img_url + logger.info(f"获取到签到图片 {img_url}") + # 获取签到图片hash + captcha_img_res = RequestUtils(cookies=site_cookie, + headers=ua, + proxies=proxy + ).get_res(url=img_url) + if not captcha_img_res or captcha_img_res.status_code != 200: + logger.error(f"签到图片 {img_url} 请求失败") + return False, f'【{site}】签到失败,未获取到签到图片' + captcha_img = Image.open(BytesIO(captcha_img_res.content)) + captcha_img_hash = self._tohash(captcha_img) + logger.debug(f"签到图片hash {captcha_img_hash}") + + # 签到答案选项 + values = html.xpath("//input[@name='answer']/@value") + options = html.xpath("//input[@name='answer']/following-sibling::text()") + + if not values or not options: + logger.error(f"签到失败,未获取到答案选项") + return False, f'【{site}】签到失败,未获取到答案选项' + + # value+选项 + answers = list(zip(values, options)) + logger.debug(f"获取到所有签到选项 {answers}") + + # 查询已有答案 + exits_answers = {} + try: + with open(self._answer_file, 'r') as f: + json_str = f.read() + exits_answers = json.loads(json_str) + # 查询本地本次验证码hash答案 + captcha_answer = exits_answers[captcha_img_hash] + + # 本地存在本次hash对应的正确答案再遍历查询 + if captcha_answer: + for value, answer in answers: + if str(captcha_answer) == str(answer): + # 确实是答案 + return self.__signin(answer=value, + site_cookie=site_cookie, + ua=ua, + proxy=proxy, + site=site) + except (FileNotFoundError, IOError, OSError) as e: + logger.debug(f"查询本地已知答案失败:{e},继续请求豆瓣查询") + + # 本地不存在正确答案则请求豆瓣查询匹配 + for value, answer in answers: + if answer: + # 豆瓣检索 + db_res = RequestUtils().get_res(url=f'https://movie.douban.com/j/subject_suggest?q={answer}') + if not db_res or db_res.status_code != 200: + logger.debug(f"签到选项 {answer} 未查询到豆瓣数据") + continue + + # 豆瓣返回结果 + db_answers = json.loads(db_res.text) + if not isinstance(db_answers, list): + db_answers = [db_answers] + + if len(db_answers) == 0: + logger.debug(f"签到选项 {answer} 查询到豆瓣数据为空") + + for db_answer in db_answers: + answer_img_url = db_answer['img'] + + # 获取答案hash + answer_img_res = RequestUtils().get_res(url=answer_img_url) + if not answer_img_res or answer_img_res.status_code != 200: + logger.debug(f"签到答案 {answer} {answer_img_url} 请求失败") + continue + + answer_img = Image.open(BytesIO(answer_img_res.content)) + answer_img_hash = self._tohash(answer_img) + logger.debug(f"签到答案图片hash {answer} {answer_img_hash}") + + # 获取选项图片与签到图片相似度,大于0.9默认是正确答案 + score = self._comparehash(captcha_img_hash, answer_img_hash) + logger.info(f"签到图片与选项 {answer} 豆瓣图片相似度 {score}") + if score > 0.9: + # 确实是答案 + return self.__signin(answer=value, + site_cookie=site_cookie, + ua=ua, + proxy=proxy, + site=site, + exits_answers=exits_answers, + captcha_img_hash=captcha_img_hash) + + # 间隔5s,防止请求太频繁被豆瓣屏蔽ip + time.sleep(5) + logger.error(f"豆瓣图片匹配,未获取到匹配答案") + + # 没有匹配签到成功,则签到失败 + return False, f'【{site}】签到失败,未获取到匹配答案' + + def __signin(self, answer, site_cookie, ua, proxy, site, exits_answers=None, captcha_img_hash=None): + """ + 签到请求 + """ + data = { + 'answer': answer, + 'submit': '提交' + } + logger.debug(f"提交data {data}") + sign_in_res = RequestUtils(cookies=site_cookie, + headers=ua, + proxies=proxy + ).post_res(url=self._sign_in_url, data=data) + if not sign_in_res or sign_in_res.status_code != 200: + logger.error(f"签到失败,签到接口请求失败") + return False, f'【{site}】签到失败,签到接口请求失败' + + # 获取签到后返回html,判断是否签到成功 + sign_status = self.sign_in_result(html_res=sign_in_res.text, + regexs=self._succeed_regex) + if sign_status: + logger.info(f"签到成功") + if exits_answers and captcha_img_hash: + # 签到成功写入本地文件 + self.__write_local_answer(exits_answers=exits_answers or {}, + captcha_img_hash=captcha_img_hash, + answer=answer) + return True, f'【{site}】签到成功' + else: + logger.error(f"签到失败,请到页面查看") + return False, f'【{site}】签到失败,请到页面查看' + + def __write_local_answer(self, exits_answers, captcha_img_hash, answer): + """ + 签到成功写入本地文件 + """ + try: + exits_answers[captcha_img_hash] = answer + # 序列化数据 + formatted_data = json.dumps(exits_answers, indent=4) + with open(self._answer_file, 'w') as f: + f.write(formatted_data) + except (FileNotFoundError, IOError, OSError) as e: + logger.debug(f"签到成功写入本地文件失败:{e}") + + @staticmethod + def _tohash(img, shape=(10, 10)): + """ + 获取图片hash + """ + img = img.resize(shape) + gray = img.convert('L') + s = 0 + hash_str = '' + for i in range(shape[1]): + for j in range(shape[0]): + s = s + gray.getpixel((j, i)) + avg = s / (shape[0] * shape[1]) + for i in range(shape[1]): + for j in range(shape[0]): + if gray.getpixel((j, i)) > avg: + hash_str = hash_str + '1' + else: + hash_str = hash_str + '0' + return hash_str + + @staticmethod + def _comparehash(hash1, hash2, shape=(10, 10)): + """ + 比较图片hash + 返回相似度 + """ + n = 0 + if len(hash1) != len(hash2): + return -1 + for i in range(len(hash1)): + if hash1[i] == hash2[i]: + n = n + 1 + return n / (shape[0] * shape[1]) diff --git a/app/plugins/autosignin/sites/ttg.py b/app/plugins/autosignin/sites/ttg.py new file mode 100644 index 00000000..156b9854 --- /dev/null +++ b/app/plugins/autosignin/sites/ttg.py @@ -0,0 +1,96 @@ +import re +from typing import Tuple + +from ruamel.yaml import CommentedMap + +from app.core import settings +from app.log import logger +from app.plugins.autosignin.sites import _ISiteSigninHandler +from app.utils.http import RequestUtils +from app.utils.string import StringUtils + + +class TTG(_ISiteSigninHandler): + """ + TTG签到 + """ + # 匹配的站点Url,每一个实现类都需要设置为自己的站点Url + site_url = "totheglory.im" + + # 已签到 + _sign_regex = ['已签到'] + _sign_text = '亲,您今天已签到过,不要太贪哦' + + # 签到成功 + _success_text = '您已连续签到' + + @classmethod + def match(cls, url: str) -> bool: + """ + 根据站点Url判断是否匹配当前站点签到类,大部分情况使用默认实现即可 + :param url: 站点Url + :return: 是否匹配,如匹配则会调用该类的signin方法 + """ + return True if StringUtils.url_equal(url, cls.site_url) else False + + def signin(self, site_info: CommentedMap) -> Tuple[bool, str]: + """ + 执行签到操作 + :param site_info: 站点信息,含有站点Url、站点Cookie、UA等信息 + :return: 签到结果信息 + """ + site = site_info.get("name") + site_cookie = site_info.get("cookie") + ua = site_info.get("ua") + proxy = settings.PROXY if site_info.get("proxy") else None + + # 获取页面html + html_res = RequestUtils(cookies=site_cookie, + headers=ua, + proxies=proxy + ).get_res(url="https://totheglory.im") + if not html_res or html_res.status_code != 200: + logger.error(f"签到失败,请检查站点连通性") + return False, f'【{site}】签到失败,请检查站点连通性' + + if "login.php" in html_res.text: + logger.error(f"签到失败,cookie失效") + return False, f'【{site}】签到失败,cookie失效' + + # 判断是否已签到 + html_res.encoding = "utf-8" + sign_status = self.sign_in_result(html_res=html_res.text, + regexs=self._sign_regex) + if sign_status: + logger.info(f"今日已签到") + return True, f'【{site}】今日已签到' + + # 获取签到参数 + signed_timestamp = re.search('(?<=signed_timestamp: ")\\d{10}', html_res.text).group() + signed_token = re.search('(?<=signed_token: ").*(?=")', html_res.text).group() + logger.debug(f"signed_timestamp={signed_timestamp} signed_token={signed_token}") + + data = { + 'signed_timestamp': signed_timestamp, + 'signed_token': signed_token + } + # 签到 + sign_res = RequestUtils(cookies=site_cookie, + headers=ua, + proxies=proxy + ).post_res(url="https://totheglory.im/signed.php", + data=data) + if not sign_res or sign_res.status_code != 200: + logger.error(f"签到失败,签到接口请求失败") + return False, f'【{site}】签到失败,签到接口请求失败' + + sign_res.encoding = "utf-8" + if self._success_text in sign_res.text: + logger.info(f"签到成功") + return True, f'【{site}】签到成功' + if self._sign_text in sign_res.text: + logger.info(f"今日已签到") + return True, f'【{site}】今日已签到' + + logger.error(f"签到失败,未知原因") + return False, f'【{site}】签到失败,未知原因' diff --git a/app/plugins/autosignin/sites/u2.py b/app/plugins/autosignin/sites/u2.py new file mode 100644 index 00000000..d1af38c5 --- /dev/null +++ b/app/plugins/autosignin/sites/u2.py @@ -0,0 +1,122 @@ +import datetime +import random +import re +from typing import Tuple + +from lxml import etree +from ruamel.yaml import CommentedMap + +from app.core import settings +from app.log import logger +from app.plugins.autosignin.sites import _ISiteSigninHandler +from app.utils.http import RequestUtils +from app.utils.string import StringUtils + + +class U2(_ISiteSigninHandler): + """ + U2签到 随机 + """ + # 匹配的站点Url,每一个实现类都需要设置为自己的站点Url + site_url = "u2.dmhy.org" + + # 已签到 + _sign_regex = ['已签到', + 'Show Up', + 'Показать', + '已簽到', + '已簽到'] + + # 签到成功 + _success_text = "window.location.href = 'showup.php';" + + @classmethod + def match(cls, url: str) -> bool: + """ + 根据站点Url判断是否匹配当前站点签到类,大部分情况使用默认实现即可 + :param url: 站点Url + :return: 是否匹配,如匹配则会调用该类的signin方法 + """ + return True if StringUtils.url_equal(url, cls.site_url) else False + + def signin(self, site_info: CommentedMap) -> Tuple[bool, str]: + """ + 执行签到操作 + :param site_info: 站点信息,含有站点Url、站点Cookie、UA等信息 + :return: 签到结果信息 + """ + site = site_info.get("name") + site_cookie = site_info.get("cookie") + ua = site_info.get("ua") + proxy = settings.PROXY if site_info.get("proxy") else None + + now = datetime.datetime.now() + # 判断当前时间是否小于9点 + if now.hour < 9: + logger.error(f"签到失败,9点前不签到") + return False, f'【{site}】签到失败,9点前不签到' + + # 获取页面html + html_res = RequestUtils(cookies=site_cookie, + headers=ua, + proxies=proxy + ).get_res(url="https://u2.dmhy.org/showup.php") + if not html_res or html_res.status_code != 200: + logger.error(f"签到失败,请检查站点连通性") + return False, f'【{site}】签到失败,请检查站点连通性' + + if "login.php" in html_res.text: + logger.error(f"签到失败,cookie失效") + return False, f'【{site}】签到失败,cookie失效' + + # 判断是否已签到 + html_res.encoding = "utf-8" + sign_status = self.sign_in_result(html_res=html_res.text, + regexs=self._sign_regex) + if sign_status: + logger.info(f"今日已签到") + return True, f'【{site}】今日已签到' + + # 没有签到则解析html + html = etree.HTML(html_res.text) + + if not html: + return False, f'【{site}】签到失败' + + # 获取签到参数 + req = html.xpath("//form//td/input[@name='req']/@value")[0] + hash_str = html.xpath("//form//td/input[@name='hash']/@value")[0] + form = html.xpath("//form//td/input[@name='form']/@value")[0] + submit_name = html.xpath("//form//td/input[@type='submit']/@name") + submit_value = html.xpath("//form//td/input[@type='submit']/@value") + if not re or not hash_str or not form or not submit_name or not submit_value: + logger.error("签到失败,未获取到相关签到参数") + return False, f'【{site}】签到失败' + + # 随机一个答案 + answer_num = random.randint(0, 3) + data = { + 'req': req, + 'hash': hash_str, + 'form': form, + 'message': '一切随缘~', + submit_name[answer_num]: submit_value[answer_num] + } + # 签到 + sign_res = RequestUtils(cookies=site_cookie, + headers=ua, + proxies=proxy + ).post_res(url="https://u2.dmhy.org/showup.php?action=show", + data=data) + if not sign_res or sign_res.status_code != 200: + logger.error(f"签到失败,签到接口请求失败") + return False, f'【{site}】签到失败,签到接口请求失败' + + # 判断是否签到成功 + # sign_res.text = "" + if self._success_text in sign_res.text: + logger.info(f"签到成功") + return True, f'【{site}】签到成功' + else: + logger.error(f"签到失败,未知原因") + return False, f'【{site}】签到失败,未知原因' diff --git a/app/plugins/autosignin/sites/zhuque.py b/app/plugins/autosignin/sites/zhuque.py new file mode 100644 index 00000000..270e0f4f --- /dev/null +++ b/app/plugins/autosignin/sites/zhuque.py @@ -0,0 +1,86 @@ +import json +from typing import Tuple + +from lxml import etree +from ruamel.yaml import CommentedMap + +from app.core import settings +from app.log import logger +from app.plugins.autosignin.sites import _ISiteSigninHandler +from app.utils.http import RequestUtils +from app.utils.string import StringUtils + + +class ZhuQue(_ISiteSigninHandler): + """ + ZHUQUE签到 + """ + # 匹配的站点Url,每一个实现类都需要设置为自己的站点Url + site_url = "zhuque.in" + + @classmethod + def match(cls, url: str) -> bool: + """ + 根据站点Url判断是否匹配当前站点签到类,大部分情况使用默认实现即可 + :param url: 站点Url + :return: 是否匹配,如匹配则会调用该类的signin方法 + """ + return True if StringUtils.url_equal(url, cls.site_url) else False + + def signin(self, site_info: CommentedMap) -> Tuple[bool, str]: + """ + 执行签到操作 + :param site_info: 站点信息,含有站点Url、站点Cookie、UA等信息 + :return: 签到结果信息 + """ + site = site_info.get("name") + site_cookie = site_info.get("cookie") + ua = site_info.get("ua") + proxy = settings.PROXY if site_info.get("proxy") else None + + # 获取页面html + html_res = RequestUtils(cookies=site_cookie, + headers=ua, + proxies=proxy + ).get_res(url="https://zhuque.in") + if not html_res or html_res.status_code != 200: + logger.error(f"模拟登录失败,请检查站点连通性") + return False, f'【{site}】模拟登录失败,请检查站点连通性' + + if "login.php" in html_res.text: + logger.error(f"模拟登录失败,cookie失效") + return False, f'【{site}】模拟登录失败,cookie失效' + + html = etree.HTML(html_res.text) + + if not html: + return False, f'【{site}】模拟登录失败' + + # 释放技能 + msg = '失败' + x_csrf_token = html.xpath("//meta[@name='x-csrf-token']/@content")[0] + if x_csrf_token: + data = { + "all": 1, + "resetModal": "true" + } + headers = { + "x-csrf-token": str(x_csrf_token), + "Content-Type": "application/json; charset=utf-8", + "User-Agent": ua + } + skill_res = RequestUtils(cookies=site_cookie, + headers=headers, + proxies=proxy + ).post_res(url="https://zhuque.in/api/gaming/fireGenshinCharacterMagic", json=data) + if not skill_res or skill_res.status_code != 200: + logger.error(f"模拟登录失败,释放技能失败") + + # '{"status":200,"data":{"code":"FIRE_GENSHIN_CHARACTER_MAGIC_SUCCESS","bonus":0}}' + skill_dict = json.loads(skill_res.text) + if skill_dict['status'] == 200: + bonus = int(skill_dict['data']['bonus']) + msg = f'成功,获得{bonus}魔力' + + logger.info(f'【{site}】模拟登录成功,技能释放{msg}') + return True, f'【{site}】模拟登录成功,技能释放{msg}' diff --git a/app/plugins/sitestatistics/__init__.py b/app/plugins/sitestatistics/__init__.py new file mode 100644 index 00000000..4abea59d --- /dev/null +++ b/app/plugins/sitestatistics/__init__.py @@ -0,0 +1,262 @@ +from datetime import datetime +from multiprocessing.dummy import Pool as ThreadPool +from threading import Lock +from typing import Optional, Any + +import requests +from ruamel.yaml import CommentedMap + +from app.core import settings +from app.helper import ModuleHelper +from app.helper.sites import SitesHelper +from app.log import logger +from app.plugins import _PluginBase +from app.plugins.sitestatistics.siteuserinfo import ISiteUserInfo +from app.utils.http import RequestUtils + +lock = Lock() + + +class SiteStatistics(_PluginBase): + sites = None + + _MAX_CONCURRENCY: int = 10 + _last_update_time: Optional[datetime] = None + _sites_data: dict = {} + _site_schema: list = None + + def init_plugin(self, config: dict = None): + # 加载模块 + self._site_schema = ModuleHelper.load('app.plugins.sitestatistics.siteuserinfo', + filter_func=lambda _, obj: hasattr(obj, 'schema')) + self._site_schema.sort(key=lambda x: x.order) + # 站点管理 + self.sites = SitesHelper() + # 站点上一次更新时间 + self._last_update_time = None + # 站点数据 + self._sites_data = {} + + def stop_service(self): + pass + + def __build_class(self, html_text: str) -> Any: + for site_schema in self._site_schema: + try: + if site_schema.match(html_text): + return site_schema + except Exception as e: + logger.error(f"站点 {site_schema.name} 匹配失败 {e}") + return None + + def build(self, url: str, site_name: str, + site_cookie: str = None, + ua: str = None, + proxy: bool = False) -> Any: + if not site_cookie: + return None + session = requests.Session() + logger.debug(f"站点 {site_name} url={url} site_cookie={site_cookie} ua={ua}") + proxies = settings.PROXY if proxy else None + res = RequestUtils(cookies=site_cookie, + session=session, + headers=ua, + proxies=proxies + ).get_res(url=url) + if res and res.status_code == 200: + if "charset=utf-8" in res.text or "charset=UTF-8" in res.text: + res.encoding = "UTF-8" + else: + res.encoding = res.apparent_encoding + html_text = res.text + # 第一次登录反爬 + if html_text.find("title") == -1: + i = html_text.find("window.location") + if i == -1: + return None + tmp_url = url + html_text[i:html_text.find(";")] \ + .replace("\"", "").replace("+", "").replace(" ", "").replace("window.location=", "") + res = RequestUtils(cookies=site_cookie, + session=session, + headers=ua, + proxies=proxies + ).get_res(url=tmp_url) + if res and res.status_code == 200: + if "charset=utf-8" in res.text or "charset=UTF-8" in res.text: + res.encoding = "UTF-8" + else: + res.encoding = res.apparent_encoding + html_text = res.text + if not html_text: + return None + else: + logger.error("站点 %s 被反爬限制:%s, 状态码:%s" % (site_name, url, res.status_code)) + return None + + # 兼容假首页情况,假首页通常没有 0: + for head, date, content in site_user_info.message_unread_contents: + msg_title = f"【站点 {site_user_info.site_name} 消息】" + msg_text = f"时间:{date}\n标题:{head}\n内容:\n{content}" + self.chain.run_module("post_message", title=msg_title, text=msg_text) + else: + self.chain.run_module("post_message", + title=f"站点 {site_user_info.site_name} 收到 " + f"{site_user_info.message_unread} 条新消息,请登陆查看") + + def refresh_all_site_data(self, force: bool = False, specify_sites: list = None): + """ + 多线程刷新站点下载上传量,默认间隔6小时 + """ + if not self.sites.get_indexers(): + return + + with lock: + + if not force \ + and not specify_sites \ + and self._last_update_time: + return + + if specify_sites \ + and not isinstance(specify_sites, list): + specify_sites = [specify_sites] + + # 没有指定站点,默认使用全部站点 + if not specify_sites: + refresh_sites = self.sites.get_indexers() + else: + refresh_sites = [site for site in self.sites.get_indexers() if + site.get("name") in specify_sites] + + if not refresh_sites: + return + + # 并发刷新 + with ThreadPool(min(len(refresh_sites), self._MAX_CONCURRENCY)) as p: + site_user_infos = p.map(self.__refresh_site_data, refresh_sites) + site_user_infos = [info for info in site_user_infos if info] + + print(site_user_infos) + # TODO 登记历史数据 + # TODO 实时用户数据 + # TODO 更新站点图标 + # TODO 实时做种信息 + + # 更新时间 + self._last_update_time = datetime.now() + + @staticmethod + def __todict(raw_statistics): + statistics = [] + for site in raw_statistics: + statistics.append({"site": site.SITE, + "username": site.USERNAME, + "user_level": site.USER_LEVEL, + "join_at": site.JOIN_AT, + "update_at": site.UPDATE_AT, + "upload": site.UPLOAD, + "download": site.DOWNLOAD, + "ratio": site.RATIO, + "seeding": site.SEEDING, + "leeching": site.LEECHING, + "seeding_size": site.SEEDING_SIZE, + "bonus": site.BONUS, + "url": site.URL, + "msg_unread": site.MSG_UNREAD + }) + return statistics diff --git a/app/plugins/sitestatistics/siteuserinfo/__init__.py b/app/plugins/sitestatistics/siteuserinfo/__init__.py new file mode 100644 index 00000000..38d01073 --- /dev/null +++ b/app/plugins/sitestatistics/siteuserinfo/__init__.py @@ -0,0 +1,360 @@ +# -*- coding: utf-8 -*- +import base64 +import json +import re +from abc import ABCMeta, abstractmethod +from typing import Optional +from urllib.parse import urljoin, urlsplit + +import requests +from lxml import etree +from requests import Session + +from app.core import settings +from app.helper.cloudflare import under_challenge +from app.log import logger +from app.utils.http import RequestUtils +from app.utils.types import SiteSchema + +SITE_BASE_ORDER = 1000 + + +class ISiteUserInfo(metaclass=ABCMeta): + # 站点模版 + schema = SiteSchema.NexusPhp + # 站点解析时判断顺序,值越小越先解析 + order = SITE_BASE_ORDER + + def __init__(self, site_name: str, + url: str, + site_cookie: str, + index_html: str, + session: Session = None, + ua: str = None, + emulate: bool = False, + proxy: bool = None): + super().__init__() + # 站点信息 + self.site_name = None + self.site_url = None + self.site_favicon = None + # 用户信息 + self.username = None + self.userid = None + # 未读消息 + self.message_unread = 0 + self.message_unread_contents = [] + + # 流量信息 + self.upload = 0 + self.download = 0 + self.ratio = 0 + + # 种子信息 + self.seeding = 0 + self.leeching = 0 + self.uploaded = 0 + self.completed = 0 + self.incomplete = 0 + self.seeding_size = 0 + self.leeching_size = 0 + self.uploaded_size = 0 + self.completed_size = 0 + self.incomplete_size = 0 + # 做种人数, 种子大小 + self.seeding_info = [] + + # 用户详细信息 + self.user_level = None + self.join_at = None + self.bonus = 0.0 + + # 错误信息 + self.err_msg = None + # 内部数据 + self._base_url = None + self._site_cookie = None + self._index_html = None + self._addition_headers = None + + # 站点页面 + self._brief_page = "index.php" + self._user_detail_page = "userdetails.php?id=" + self._user_traffic_page = "index.php" + self._torrent_seeding_page = "getusertorrentlistajax.php?userid=" + self._user_mail_unread_page = "messages.php?action=viewmailbox&box=1&unread=yes" + self._sys_mail_unread_page = "messages.php?action=viewmailbox&box=-2&unread=yes" + self._torrent_seeding_params = None + self._torrent_seeding_headers = None + + split_url = urlsplit(url) + self.site_name = site_name + self.site_url = url + self._base_url = f"{split_url.scheme}://{split_url.netloc}" + self._favicon_url = urljoin(self._base_url, "favicon.ico") + self.site_favicon = "" + self._site_cookie = site_cookie + self._index_html = index_html + self._session = session if session else requests.Session() + self._ua = ua + + self._emulate = emulate + self._proxy = proxy + + def site_schema(self): + """ + 站点解析模型 + :return: 站点解析模型 + """ + return self.schema + + @classmethod + def match(cls, html_text: str) -> bool: + """ + 是否匹配当前解析模型 + :param html_text: 站点首页html + :return: 是否匹配 + """ + pass + + def parse(self): + """ + 解析站点信息 + :return: + """ + self._parse_favicon(self._index_html) + if not self._parse_logged_in(self._index_html): + return + + self._parse_site_page(self._index_html) + self._parse_user_base_info(self._index_html) + self._pase_unread_msgs() + if self._user_traffic_page: + self._parse_user_traffic_info(self._get_page_content(urljoin(self._base_url, self._user_traffic_page))) + if self._user_detail_page: + self._parse_user_detail_info(self._get_page_content(urljoin(self._base_url, self._user_detail_page))) + + self._parse_seeding_pages() + self.seeding_info = json.dumps(self.seeding_info) + + def _pase_unread_msgs(self): + """ + 解析所有未读消息标题和内容 + :return: + """ + unread_msg_links = [] + if self.message_unread > 0: + links = {self._user_mail_unread_page, self._sys_mail_unread_page} + for link in links: + if not link: + continue + + msg_links = [] + next_page = self._parse_message_unread_links( + self._get_page_content(urljoin(self._base_url, link)), msg_links) + while next_page: + next_page = self._parse_message_unread_links( + self._get_page_content(urljoin(self._base_url, next_page)), msg_links) + + unread_msg_links.extend(msg_links) + + for msg_link in unread_msg_links: + logger.debug(f"{self.site_name} 信息链接 {msg_link}") + head, date, content = self._parse_message_content(self._get_page_content(urljoin(self._base_url, msg_link))) + logger.debug(f"{self.site_name} 标题 {head} 时间 {date} 内容 {content}") + self.message_unread_contents.append((head, date, content)) + + def _parse_seeding_pages(self): + if self._torrent_seeding_page: + # 第一页 + next_page = self._parse_user_torrent_seeding_info( + self._get_page_content(urljoin(self._base_url, self._torrent_seeding_page), + self._torrent_seeding_params, + self._torrent_seeding_headers)) + + # 其他页处理 + while next_page: + next_page = self._parse_user_torrent_seeding_info( + self._get_page_content(urljoin(urljoin(self._base_url, self._torrent_seeding_page), next_page), + self._torrent_seeding_params, + self._torrent_seeding_headers), + multi_page=True) + + @staticmethod + def _prepare_html_text(html_text): + """ + 处理掉HTML中的干扰部分 + """ + return re.sub(r"#\d+", "", re.sub(r"\d+px", "", html_text)) + + @abstractmethod + def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]: + """ + 获取未阅读消息链接 + :param html_text: + :return: + """ + pass + + def _parse_favicon(self, html_text): + """ + 解析站点favicon,返回base64 fav图标 + :param html_text: + :return: + """ + html = etree.HTML(html_text) + if html: + fav_link = html.xpath('//head/link[contains(@rel, "icon")]/@href') + if fav_link: + self._favicon_url = urljoin(self._base_url, fav_link[0]) + + res = RequestUtils(cookies=self._site_cookie, session=self._session, timeout=60, headers=self._ua).get_res( + url=self._favicon_url) + if res: + self.site_favicon = base64.b64encode(res.content).decode() + + def _get_page_content(self, url, params=None, headers=None): + """ + :param url: 网页地址 + :param params: post参数 + :param headers: 额外的请求头 + :return: + """ + req_headers = None + proxies = settings.PROXY if self._proxy else None + if self._ua or headers or self._addition_headers: + req_headers = {} + if headers: + req_headers.update(headers) + + req_headers.update({ + "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", + "User-Agent": f"{self._ua}" + }) + + if self._addition_headers: + req_headers.update(self._addition_headers) + + if params: + res = RequestUtils(cookies=self._site_cookie, + session=self._session, + timeout=60, + proxies=proxies, + headers=req_headers).post_res(url=url, data=params) + else: + res = RequestUtils(cookies=self._site_cookie, + session=self._session, + timeout=60, + proxies=proxies, + headers=req_headers).get_res(url=url) + if res is not None and res.status_code in (200, 500, 403): + # 如果cloudflare 有防护,尝试使用浏览器仿真 + if under_challenge(res.text): + logger.warn( + f"{self.site_name} 检测到Cloudflare,请更新Cookie和UA") + return "" + if "charset=utf-8" in res.text or "charset=UTF-8" in res.text: + res.encoding = "UTF-8" + else: + res.encoding = res.apparent_encoding + return res.text + + return "" + + @abstractmethod + def _parse_site_page(self, html_text: str): + """ + 解析站点相关信息页面 + :param html_text: + :return: + """ + pass + + @abstractmethod + def _parse_user_base_info(self, html_text: str): + """ + 解析用户基础信息 + :param html_text: + :return: + """ + pass + + def _parse_logged_in(self, html_text): + """ + 解析用户是否已经登陆 + :param html_text: + :return: True/False + """ + logged_in = self.is_logged_in(html_text) + if not logged_in: + self.err_msg = "未检测到已登陆,请检查cookies是否过期" + logger.warn(f"{self.site_name} 未登录,跳过后续操作") + + return logged_in + + @abstractmethod + def _parse_user_traffic_info(self, html_text: str): + """ + 解析用户的上传,下载,分享率等信息 + :param html_text: + :return: + """ + pass + + @abstractmethod + def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]: + """ + 解析用户的做种相关信息 + :param html_text: + :param multi_page: 是否多页数据 + :return: 下页地址 + """ + pass + + @abstractmethod + def _parse_user_detail_info(self, html_text: str): + """ + 解析用户的详细信息 + 加入时间/等级/魔力值等 + :param html_text: + :return: + """ + pass + + @abstractmethod + def _parse_message_content(self, html_text): + """ + 解析短消息内容 + :param html_text: + :return: head: message, date: time, content: message content + """ + pass + + @classmethod + def is_logged_in(cls, html_text: str) -> bool: + """ + 判断站点是否已经登陆 + :param html_text: + :return: + """ + html = etree.HTML(html_text) + if not html: + return False + # 存在明显的密码输入框,说明未登录 + if html.xpath("//input[@type='password']"): + return False + # 是否存在登出和用户面板等链接 + xpaths = ['//a[contains(@href, "logout")' + ' or contains(@data-url, "logout")' + ' or contains(@href, "mybonus") ' + ' or contains(@onclick, "logout")' + ' or contains(@href, "usercp")]', + '//form[contains(@action, "logout")]'] + for xpath in xpaths: + if html.xpath(xpath): + return True + user_info_div = html.xpath('//div[@class="user-info-side"]') + if user_info_div: + return True + + return False diff --git a/app/plugins/sitestatistics/siteuserinfo/discuz.py b/app/plugins/sitestatistics/siteuserinfo/discuz.py new file mode 100644 index 00000000..9c67f78f --- /dev/null +++ b/app/plugins/sitestatistics/siteuserinfo/discuz.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +import re +from typing import Optional + +from lxml import etree + +from app.plugins.sitestatistics.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER +from app.utils.string import StringUtils +from app.utils.types import SiteSchema + + +class DiscuzUserInfo(ISiteUserInfo): + schema = SiteSchema.DiscuzX + order = SITE_BASE_ORDER + 10 + + @classmethod + def match(cls, html_text: str) -> bool: + html = etree.HTML(html_text) + if not html: + return False + + printable_text = html.xpath("string(.)") if html else "" + return 'Powered by Discuz!' in printable_text + + def _parse_user_base_info(self, html_text: str): + html_text = self._prepare_html_text(html_text) + html = etree.HTML(html_text) + + user_info = html.xpath('//a[contains(@href, "&uid=")]') + if user_info: + user_id_match = re.search(r"&uid=(\d+)", user_info[0].attrib['href']) + if user_id_match and user_id_match.group().strip(): + self.userid = user_id_match.group(1) + self._torrent_seeding_page = f"forum.php?&mod=torrents&cat_5up=on" + self._user_detail_page = user_info[0].attrib['href'] + self.username = user_info[0].text.strip() + + def _parse_site_page(self, html_text: str): + # TODO + pass + + def _parse_user_detail_info(self, html_text: str): + """ + 解析用户额外信息,加入时间,等级 + :param html_text: + :return: + """ + html = etree.HTML(html_text) + if not html: + return None + + # 用户等级 + user_levels_text = html.xpath('//a[contains(@href, "usergroup")]/text()') + if user_levels_text: + self.user_level = user_levels_text[-1].strip() + + # 加入日期 + join_at_text = html.xpath('//li[em[text()="注册时间"]]/text()') + if join_at_text: + self.join_at = StringUtils.unify_datetime_str(join_at_text[0].strip()) + + # 分享率 + ratio_text = html.xpath('//li[contains(.//text(), "分享率")]//text()') + if ratio_text: + ratio_match = re.search(r"\(([\d,.]+)\)", ratio_text[0]) + if ratio_match and ratio_match.group(1).strip(): + self.bonus = StringUtils.str_float(ratio_match.group(1)) + + # 积分 + bouns_text = html.xpath('//li[em[text()="积分"]]/text()') + if bouns_text: + self.bonus = StringUtils.str_float(bouns_text[0].strip()) + + # 上传 + upload_text = html.xpath('//li[em[contains(text(),"上传量")]]/text()') + if upload_text: + self.upload = StringUtils.num_filesize(upload_text[0].strip().split('/')[-1]) + + # 下载 + download_text = html.xpath('//li[em[contains(text(),"下载量")]]/text()') + if download_text: + self.download = StringUtils.num_filesize(download_text[0].strip().split('/')[-1]) + + def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]: + """ + 做种相关信息 + :param html_text: + :param multi_page: 是否多页数据 + :return: 下页地址 + """ + html = etree.HTML(html_text) + if not html: + return None + + size_col = 3 + seeders_col = 4 + # 搜索size列 + if html.xpath('//tr[position()=1]/td[.//img[@class="size"] and .//img[@alt="size"]]'): + size_col = len(html.xpath('//tr[position()=1]/td[.//img[@class="size"] ' + 'and .//img[@alt="size"]]/preceding-sibling::td')) + 1 + # 搜索seeders列 + if html.xpath('//tr[position()=1]/td[.//img[@class="seeders"] and .//img[@alt="seeders"]]'): + seeders_col = len(html.xpath('//tr[position()=1]/td[.//img[@class="seeders"] ' + 'and .//img[@alt="seeders"]]/preceding-sibling::td')) + 1 + + page_seeding = 0 + page_seeding_size = 0 + page_seeding_info = [] + seeding_sizes = html.xpath(f'//tr[position()>1]/td[{size_col}]') + seeding_seeders = html.xpath(f'//tr[position()>1]/td[{seeders_col}]//text()') + if seeding_sizes and seeding_seeders: + page_seeding = len(seeding_sizes) + + for i in range(0, len(seeding_sizes)): + size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip()) + seeders = StringUtils.str_int(seeding_seeders[i]) + + page_seeding_size += size + page_seeding_info.append([seeders, size]) + + self.seeding += page_seeding + self.seeding_size += page_seeding_size + self.seeding_info.extend(page_seeding_info) + + # 是否存在下页数据 + next_page = None + next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁")]/@href') + if next_page_text: + next_page = next_page_text[-1].strip() + + return next_page + + def _parse_user_traffic_info(self, html_text: str): + pass + + def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]: + return None + + def _parse_message_content(self, html_text): + return None, None, None diff --git a/app/plugins/sitestatistics/siteuserinfo/file_list.py b/app/plugins/sitestatistics/siteuserinfo/file_list.py new file mode 100644 index 00000000..0c4e4d54 --- /dev/null +++ b/app/plugins/sitestatistics/siteuserinfo/file_list.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +import re +from typing import Optional + +from lxml import etree + +from app.plugins.sitestatistics.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER +from app.utils.string import StringUtils +from app.utils.types import SiteSchema + + +class FileListSiteUserInfo(ISiteUserInfo): + schema = SiteSchema.FileList + order = SITE_BASE_ORDER + 50 + + @classmethod + def match(cls, html_text: str) -> bool: + html = etree.HTML(html_text) + if not html: + return False + + printable_text = html.xpath("string(.)") if html else "" + return 'Powered by FileList' in printable_text + + def _parse_site_page(self, html_text: str): + html_text = self._prepare_html_text(html_text) + + user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text) + if user_detail and user_detail.group().strip(): + self._user_detail_page = user_detail.group().strip().lstrip('/') + self.userid = user_detail.group(1) + + self._torrent_seeding_page = f"snatchlist.php?id={self.userid}&action=torrents&type=seeding" + + def _parse_user_base_info(self, html_text: str): + html_text = self._prepare_html_text(html_text) + html = etree.HTML(html_text) + + ret = html.xpath(f'//a[contains(@href, "userdetails") and contains(@href, "{self.userid}")]//text()') + if ret: + self.username = str(ret[0]) + + def _parse_user_traffic_info(self, html_text: str): + """ + 上传/下载/分享率 [做种数/魔力值] + :param html_text: + :return: + """ + return + + def _parse_user_detail_info(self, html_text: str): + html_text = self._prepare_html_text(html_text) + html = etree.HTML(html_text) + + upload_html = html.xpath('//table//tr/td[text()="Uploaded"]/following-sibling::td//text()') + if upload_html: + self.upload = StringUtils.num_filesize(upload_html[0]) + download_html = html.xpath('//table//tr/td[text()="Downloaded"]/following-sibling::td//text()') + if download_html: + self.download = StringUtils.num_filesize(download_html[0]) + + self.ratio = 0 if self.download == 0 else self.upload / self.download + + user_level_html = html.xpath('//table//tr/td[text()="Class"]/following-sibling::td//text()') + if user_level_html: + self.user_level = user_level_html[0].strip() + + join_at_html = html.xpath('//table//tr/td[contains(text(), "Join")]/following-sibling::td//text()') + if join_at_html: + self.join_at = StringUtils.unify_datetime_str(join_at_html[0].strip()) + + bonus_html = html.xpath('//a[contains(@href, "shop.php")]') + if bonus_html: + self.bonus = StringUtils.str_float(bonus_html[0].xpath("string(.)").strip()) + pass + + def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]: + """ + 做种相关信息 + :param html_text: + :param multi_page: 是否多页数据 + :return: 下页地址 + """ + html = etree.HTML(html_text) + if not html: + return None + + size_col = 6 + seeders_col = 7 + + page_seeding = 0 + page_seeding_size = 0 + page_seeding_info = [] + seeding_sizes = html.xpath(f'//table/tr[position()>1]/td[{size_col}]') + seeding_seeders = html.xpath(f'//table/tr[position()>1]/td[{seeders_col}]') + if seeding_sizes and seeding_seeders: + page_seeding = len(seeding_sizes) + + for i in range(0, len(seeding_sizes)): + size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip()) + seeders = StringUtils.str_int(seeding_seeders[i].xpath("string(.)").strip()) + + page_seeding_size += size + page_seeding_info.append([seeders, size]) + + self.seeding += page_seeding + self.seeding_size += page_seeding_size + self.seeding_info.extend(page_seeding_info) + + # 是否存在下页数据 + next_page = None + + return next_page + + def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]: + return None + + def _parse_message_content(self, html_text): + return None, None, None diff --git a/app/plugins/sitestatistics/siteuserinfo/gazelle.py b/app/plugins/sitestatistics/siteuserinfo/gazelle.py new file mode 100644 index 00000000..cc53b0ba --- /dev/null +++ b/app/plugins/sitestatistics/siteuserinfo/gazelle.py @@ -0,0 +1,164 @@ +# -*- coding: utf-8 -*- +import re +from typing import Optional + +from lxml import etree + +from app.plugins.sitestatistics.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER +from app.utils.string import StringUtils +from app.utils.types import SiteSchema + + +class GazelleSiteUserInfo(ISiteUserInfo): + schema = SiteSchema.Gazelle + order = SITE_BASE_ORDER + + @classmethod + def match(cls, html_text: str) -> bool: + html = etree.HTML(html_text) + if not html: + return False + + printable_text = html.xpath("string(.)") if html else "" + + return "Powered by Gazelle" in printable_text or "DIC Music" in printable_text + + def _parse_user_base_info(self, html_text: str): + html_text = self._prepare_html_text(html_text) + html = etree.HTML(html_text) + + tmps = html.xpath('//a[contains(@href, "user.php?id=")]') + if tmps: + user_id_match = re.search(r"user.php\?id=(\d+)", tmps[0].attrib['href']) + if user_id_match and user_id_match.group().strip(): + self.userid = user_id_match.group(1) + self._torrent_seeding_page = f"torrents.php?type=seeding&userid={self.userid}" + self._user_detail_page = f"user.php?id={self.userid}" + self.username = tmps[0].text.strip() + + tmps = html.xpath('//*[@id="header-uploaded-value"]/@data-value') + if tmps: + self.upload = StringUtils.num_filesize(tmps[0]) + else: + tmps = html.xpath('//li[@id="stats_seeding"]/span/text()') + if tmps: + self.upload = StringUtils.num_filesize(tmps[0]) + + tmps = html.xpath('//*[@id="header-downloaded-value"]/@data-value') + if tmps: + self.download = StringUtils.num_filesize(tmps[0]) + else: + tmps = html.xpath('//li[@id="stats_leeching"]/span/text()') + if tmps: + self.download = StringUtils.num_filesize(tmps[0]) + + self.ratio = 0.0 if self.download <= 0.0 else round(self.upload / self.download, 3) + + tmps = html.xpath('//a[contains(@href, "bonus.php")]/@data-tooltip') + if tmps: + bonus_match = re.search(r"([\d,.]+)", tmps[0]) + if bonus_match and bonus_match.group(1).strip(): + self.bonus = StringUtils.str_float(bonus_match.group(1)) + else: + tmps = html.xpath('//a[contains(@href, "bonus.php")]') + if tmps: + bonus_text = tmps[0].xpath("string(.)") + bonus_match = re.search(r"([\d,.]+)", bonus_text) + if bonus_match and bonus_match.group(1).strip(): + self.bonus = StringUtils.str_float(bonus_match.group(1)) + + def _parse_site_page(self, html_text: str): + # TODO + pass + + def _parse_user_detail_info(self, html_text: str): + """ + 解析用户额外信息,加入时间,等级 + :param html_text: + :return: + """ + html = etree.HTML(html_text) + if not html: + return None + + # 用户等级 + user_levels_text = html.xpath('//*[@id="class-value"]/@data-value') + if user_levels_text: + self.user_level = user_levels_text[0].strip() + else: + user_levels_text = html.xpath('//li[contains(text(), "用户等级")]/text()') + if user_levels_text: + self.user_level = user_levels_text[0].split(':')[1].strip() + + # 加入日期 + join_at_text = html.xpath('//*[@id="join-date-value"]/@data-value') + if join_at_text: + self.join_at = StringUtils.unify_datetime_str(join_at_text[0].strip()) + else: + join_at_text = html.xpath( + '//div[contains(@class, "box_userinfo_stats")]//li[contains(text(), "加入时间")]/span/text()') + if join_at_text: + self.join_at = StringUtils.unify_datetime_str(join_at_text[0].strip()) + + def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]: + """ + 做种相关信息 + :param html_text: + :param multi_page: 是否多页数据 + :return: 下页地址 + """ + html = etree.HTML(html_text) + if not html: + return None + + size_col = 3 + # 搜索size列 + if html.xpath('//table[contains(@id, "torrent")]//tr[1]/td'): + size_col = len(html.xpath('//table[contains(@id, "torrent")]//tr[1]/td')) - 3 + # 搜索seeders列 + seeders_col = size_col + 2 + + page_seeding = 0 + page_seeding_size = 0 + page_seeding_info = [] + seeding_sizes = html.xpath(f'//table[contains(@id, "torrent")]//tr[position()>1]/td[{size_col}]') + seeding_seeders = html.xpath(f'//table[contains(@id, "torrent")]//tr[position()>1]/td[{seeders_col}]/text()') + if seeding_sizes and seeding_seeders: + page_seeding = len(seeding_sizes) + + for i in range(0, len(seeding_sizes)): + size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip()) + seeders = int(seeding_seeders[i]) + + page_seeding_size += size + page_seeding_info.append([seeders, size]) + + if multi_page: + self.seeding += page_seeding + self.seeding_size += page_seeding_size + self.seeding_info.extend(page_seeding_info) + else: + if not self.seeding: + self.seeding = page_seeding + if not self.seeding_size: + self.seeding_size = page_seeding_size + if not self.seeding_info: + self.seeding_info = page_seeding_info + + # 是否存在下页数据 + next_page = None + next_page_text = html.xpath('//a[contains(.//text(), "Next") or contains(.//text(), "下一页")]/@href') + if next_page_text: + next_page = next_page_text[-1].strip() + + return next_page + + def _parse_user_traffic_info(self, html_text: str): + # TODO + pass + + def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]: + return None + + def _parse_message_content(self, html_text): + return None, None, None diff --git a/app/plugins/sitestatistics/siteuserinfo/ipt_project.py b/app/plugins/sitestatistics/siteuserinfo/ipt_project.py new file mode 100644 index 00000000..26af3202 --- /dev/null +++ b/app/plugins/sitestatistics/siteuserinfo/ipt_project.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +import re +from typing import Optional + +from lxml import etree + +from app.plugins.sitestatistics.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER +from app.utils.string import StringUtils +from app.utils.types import SiteSchema + + +class IptSiteUserInfo(ISiteUserInfo): + schema = SiteSchema.Ipt + order = SITE_BASE_ORDER + 35 + + @classmethod + def match(cls, html_text: str) -> bool: + return 'IPTorrents' in html_text + + def _parse_user_base_info(self, html_text: str): + html_text = self._prepare_html_text(html_text) + html = etree.HTML(html_text) + tmps = html.xpath('//a[contains(@href, "/u/")]//text()') + tmps_id = html.xpath('//a[contains(@href, "/u/")]/@href') + if tmps: + self.username = str(tmps[-1]) + if tmps_id: + user_id_match = re.search(r"/u/(\d+)", tmps_id[0]) + if user_id_match and user_id_match.group().strip(): + self.userid = user_id_match.group(1) + self._user_detail_page = f"user.php?u={self.userid}" + self._torrent_seeding_page = f"peers?u={self.userid}" + + tmps = html.xpath('//div[@class = "stats"]/div/div') + if tmps: + self.upload = StringUtils.num_filesize(str(tmps[0].xpath('span/text()')[1]).strip()) + self.download = StringUtils.num_filesize(str(tmps[0].xpath('span/text()')[2]).strip()) + self.seeding = StringUtils.str_int(tmps[0].xpath('a')[2].xpath('text()')[0]) + self.leeching = StringUtils.str_int(tmps[0].xpath('a')[2].xpath('text()')[1]) + self.ratio = StringUtils.str_float(str(tmps[0].xpath('span/text()')[0]).strip().replace('-', '0')) + self.bonus = StringUtils.str_float(tmps[0].xpath('a')[3].xpath('text()')[0]) + + def _parse_site_page(self, html_text: str): + # TODO + pass + + def _parse_user_detail_info(self, html_text: str): + html = etree.HTML(html_text) + if not html: + return + + user_levels_text = html.xpath('//tr/th[text()="Class"]/following-sibling::td[1]/text()') + if user_levels_text: + self.user_level = user_levels_text[0].strip() + + # 加入日期 + join_at_text = html.xpath('//tr/th[text()="Join date"]/following-sibling::td[1]/text()') + if join_at_text: + self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0]) + + def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]: + html = etree.HTML(html_text) + if not html: + return + # seeding start + seeding_end_pos = 3 + if html.xpath('//tr/td[text() = "Leechers"]'): + seeding_end_pos = len(html.xpath('//tr/td[text() = "Leechers"]/../preceding-sibling::tr')) + 1 + seeding_end_pos = seeding_end_pos - 3 + + page_seeding = 0 + page_seeding_size = 0 + seeding_torrents = html.xpath('//tr/td[text() = "Seeders"]/../following-sibling::tr/td[position()=6]/text()') + if seeding_torrents: + page_seeding = seeding_end_pos + for per_size in seeding_torrents[:seeding_end_pos]: + if '(' in per_size and ')' in per_size: + per_size = per_size.split('(')[-1] + per_size = per_size.split(')')[0] + + page_seeding_size += StringUtils.num_filesize(per_size) + + self.seeding = page_seeding + self.seeding_size = page_seeding_size + + def _parse_user_traffic_info(self, html_text: str): + # TODO + pass + + def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]: + return None + + def _parse_message_content(self, html_text): + return None, None, None diff --git a/app/plugins/sitestatistics/siteuserinfo/nexus_php.py b/app/plugins/sitestatistics/siteuserinfo/nexus_php.py new file mode 100644 index 00000000..d9ade094 --- /dev/null +++ b/app/plugins/sitestatistics/siteuserinfo/nexus_php.py @@ -0,0 +1,401 @@ +# -*- coding: utf-8 -*- +import re +from typing import Optional + +from lxml import etree + +from app.log import logger +from app.plugins.sitestatistics.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER +from app.utils.string import StringUtils +from app.utils.types import SiteSchema + + +class NexusPhpSiteUserInfo(ISiteUserInfo): + schema = SiteSchema.NexusPhp + order = SITE_BASE_ORDER * 2 + + @classmethod + def match(cls, html_text: str) -> bool: + """ + 默认使用NexusPhp解析 + :param html_text: + :return: + """ + return True + + def _parse_site_page(self, html_text: str): + html_text = self._prepare_html_text(html_text) + + user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text) + if user_detail and user_detail.group().strip(): + self._user_detail_page = user_detail.group().strip().lstrip('/') + self.userid = user_detail.group(1) + self._torrent_seeding_page = f"getusertorrentlistajax.php?userid={self.userid}&type=seeding" + else: + user_detail = re.search(r"(userdetails)", html_text) + if user_detail and user_detail.group().strip(): + self._user_detail_page = user_detail.group().strip().lstrip('/') + self.userid = None + self._torrent_seeding_page = None + + def _parse_message_unread(self, html_text): + """ + 解析未读短消息数量 + :param html_text: + :return: + """ + html = etree.HTML(html_text) + if not html: + return + + message_labels = html.xpath('//a[@href="messages.php"]/..') + message_labels.extend(html.xpath('//a[contains(@href, "messages.php")]/..')) + if message_labels: + message_text = message_labels[0].xpath("string(.)") + + logger.debug(f"{self.site_name} 消息原始信息 {message_text}") + message_unread_match = re.findall(r"[^Date](信息箱\s*|\(|你有\xa0)(\d+)", message_text) + + if message_unread_match and len(message_unread_match[-1]) == 2: + self.message_unread = StringUtils.str_int(message_unread_match[-1][1]) + elif message_text.isdigit(): + self.message_unread = StringUtils.str_int(message_text) + + def _parse_user_base_info(self, html_text: str): + # 合并解析,减少额外请求调用 + self.__parse_user_traffic_info(html_text) + self._user_traffic_page = None + + self._parse_message_unread(html_text) + + html = etree.HTML(html_text) + if not html: + return + + ret = html.xpath(f'//a[contains(@href, "userdetails") and contains(@href, "{self.userid}")]//b//text()') + if ret: + self.username = str(ret[0]) + return + ret = html.xpath(f'//a[contains(@href, "userdetails") and contains(@href, "{self.userid}")]//text()') + if ret: + self.username = str(ret[0]) + + ret = html.xpath('//a[contains(@href, "userdetails")]//strong//text()') + if ret: + self.username = str(ret[0]) + return + + def __parse_user_traffic_info(self, html_text): + html_text = self._prepare_html_text(html_text) + upload_match = re.search(r"[^总]上[传傳]量?[::_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text, + re.IGNORECASE) + self.upload = StringUtils.num_filesize(upload_match.group(1).strip()) if upload_match else 0 + download_match = re.search(r"[^总子影力]下[载載]量?[::_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text, + re.IGNORECASE) + self.download = StringUtils.num_filesize(download_match.group(1).strip()) if download_match else 0 + ratio_match = re.search(r"分享率[::_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+)", html_text) + # 计算分享率 + calc_ratio = 0.0 if self.download <= 0.0 else round(self.upload / self.download, 3) + # 优先使用页面上的分享率 + self.ratio = StringUtils.str_float(ratio_match.group(1)) if ( + ratio_match and ratio_match.group(1).strip()) else calc_ratio + leeching_match = re.search(r"(Torrents leeching|下载中)[\u4E00-\u9FA5\D\s]+(\d+)[\s\S]+<", html_text) + self.leeching = StringUtils.str_int(leeching_match.group(2)) if leeching_match and leeching_match.group( + 2).strip() else 0 + html = etree.HTML(html_text) + has_ucoin, self.bonus = self.__parse_ucoin(html) + if has_ucoin: + return + tmps = html.xpath('//a[contains(@href,"mybonus")]/text()') if html else None + if tmps: + bonus_text = str(tmps[0]).strip() + bonus_match = re.search(r"([\d,.]+)", bonus_text) + if bonus_match and bonus_match.group(1).strip(): + self.bonus = StringUtils.str_float(bonus_match.group(1)) + return + bonus_match = re.search(r"mybonus.[\[\]::<>/a-zA-Z_\-=\"'\s#;.(使用魔力值豆]+\s*([\d,.]+)[<()&\s]", html_text) + try: + if bonus_match and bonus_match.group(1).strip(): + self.bonus = StringUtils.str_float(bonus_match.group(1)) + return + bonus_match = re.search(r"[魔力值|\]][\[\]::<>/a-zA-Z_\-=\"'\s#;]+\s*([\d,.]+|\"[\d,.]+\")[<>()&\s]", + html_text, + flags=re.S) + if bonus_match and bonus_match.group(1).strip(): + self.bonus = StringUtils.str_float(bonus_match.group(1).strip('"')) + except Exception as err: + logger.error(f"{self.site_name} 解析魔力值出错, 错误信息: {err}") + + @staticmethod + def __parse_ucoin(html): + """ + 解析ucoin, 统一转换为铜币 + :param html: + :return: + """ + if html: + gold, silver, copper = None, None, None + + golds = html.xpath('//span[@class = "ucoin-symbol ucoin-gold"]//text()') + if golds: + gold = StringUtils.str_float(str(golds[-1])) + silvers = html.xpath('//span[@class = "ucoin-symbol ucoin-silver"]//text()') + if silvers: + silver = StringUtils.str_float(str(silvers[-1])) + coppers = html.xpath('//span[@class = "ucoin-symbol ucoin-copper"]//text()') + if coppers: + copper = StringUtils.str_float(str(coppers[-1])) + if gold or silver or copper: + gold = gold if gold else 0 + silver = silver if silver else 0 + copper = copper if copper else 0 + return True, gold * 100 * 100 + silver * 100 + copper + return False, 0.0 + + def _parse_user_traffic_info(self, html_text: str): + """ + 上传/下载/分享率 [做种数/魔力值] + :param html_text: + :return: + """ + pass + + def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]: + """ + 做种相关信息 + :param html_text: + :param multi_page: 是否多页数据 + :return: 下页地址 + """ + html = etree.HTML(str(html_text).replace(r'\/', '/')) + if not html: + return None + + # 首页存在扩展链接,使用扩展链接 + seeding_url_text = html.xpath('//a[contains(@href,"torrents.php") ' + 'and contains(@href,"seeding")]/@href') + if multi_page is False and seeding_url_text and seeding_url_text[0].strip(): + self._torrent_seeding_page = seeding_url_text[0].strip() + return self._torrent_seeding_page + + size_col = 3 + seeders_col = 4 + # 搜索size列 + size_col_xpath = '//tr[position()=1]/' \ + 'td[(img[@class="size"] and img[@alt="size"])' \ + ' or (text() = "大小")' \ + ' or (a/img[@class="size" and @alt="size"])]' + if html.xpath(size_col_xpath): + size_col = len(html.xpath(f'{size_col_xpath}/preceding-sibling::td')) + 1 + # 搜索seeders列 + seeders_col_xpath = '//tr[position()=1]/' \ + 'td[(img[@class="seeders"] and img[@alt="seeders"])' \ + ' or (text() = "在做种")' \ + ' or (a/img[@class="seeders" and @alt="seeders"])]' + if html.xpath(seeders_col_xpath): + seeders_col = len(html.xpath(f'{seeders_col_xpath}/preceding-sibling::td')) + 1 + + page_seeding = 0 + page_seeding_size = 0 + page_seeding_info = [] + # 如果 table class="torrents",则增加table[@class="torrents"] + table_class = '//table[@class="torrents"]' if html.xpath('//table[@class="torrents"]') else '' + seeding_sizes = html.xpath(f'{table_class}//tr[position()>1]/td[{size_col}]') + seeding_seeders = html.xpath(f'{table_class}//tr[position()>1]/td[{seeders_col}]/b/a/text()') + if not seeding_seeders: + seeding_seeders = html.xpath(f'{table_class}//tr[position()>1]/td[{seeders_col}]//text()') + if seeding_sizes and seeding_seeders: + page_seeding = len(seeding_sizes) + + for i in range(0, len(seeding_sizes)): + size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip()) + seeders = StringUtils.str_int(seeding_seeders[i]) + + page_seeding_size += size + page_seeding_info.append([seeders, size]) + + self.seeding += page_seeding + self.seeding_size += page_seeding_size + self.seeding_info.extend(page_seeding_info) + + # 是否存在下页数据 + next_page = None + next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁")]/@href') + if next_page_text: + next_page = next_page_text[-1].strip() + # fix up page url + if self.userid not in next_page: + next_page = f'{next_page}&userid={self.userid}&type=seeding' + + return next_page + + def _parse_user_detail_info(self, html_text: str): + """ + 解析用户额外信息,加入时间,等级 + :param html_text: + :return: + """ + html = etree.HTML(html_text) + if not html: + return + + self.__get_user_level(html) + + self.__fixup_traffic_info(html) + + # 加入日期 + join_at_text = html.xpath( + '//tr/td[text()="加入日期" or text()="注册日期" or *[text()="加入日期"]]/following-sibling::td[1]//text()' + '|//div/b[text()="加入日期"]/../text()') + if join_at_text: + self.join_at = StringUtils.unify_datetime_str(join_at_text[0].split(' (')[0].strip()) + + # 做种体积 & 做种数 + # seeding 页面获取不到的话,此处再获取一次 + seeding_sizes = html.xpath('//tr/td[text()="当前上传"]/following-sibling::td[1]//' + 'table[tr[1][td[4 and text()="尺寸"]]]//tr[position()>1]/td[4]') + seeding_seeders = html.xpath('//tr/td[text()="当前上传"]/following-sibling::td[1]//' + 'table[tr[1][td[5 and text()="做种者"]]]//tr[position()>1]/td[5]//text()') + tmp_seeding = len(seeding_sizes) + tmp_seeding_size = 0 + tmp_seeding_info = [] + for i in range(0, len(seeding_sizes)): + size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip()) + seeders = StringUtils.str_int(seeding_seeders[i]) + + tmp_seeding_size += size + tmp_seeding_info.append([seeders, size]) + + if not self.seeding_size: + self.seeding_size = tmp_seeding_size + if not self.seeding: + self.seeding = tmp_seeding + if not self.seeding_info: + self.seeding_info = tmp_seeding_info + + seeding_sizes = html.xpath('//tr/td[text()="做种统计"]/following-sibling::td[1]//text()') + if seeding_sizes: + seeding_match = re.search(r"总做种数:\s+(\d+)", seeding_sizes[0], re.IGNORECASE) + seeding_size_match = re.search(r"总做种体积:\s+([\d,.\s]+[KMGTPI]*B)", seeding_sizes[0], re.IGNORECASE) + tmp_seeding = StringUtils.str_int(seeding_match.group(1)) if ( + seeding_match and seeding_match.group(1)) else 0 + tmp_seeding_size = StringUtils.num_filesize( + seeding_size_match.group(1).strip()) if seeding_size_match else 0 + if not self.seeding_size: + self.seeding_size = tmp_seeding_size + if not self.seeding: + self.seeding = tmp_seeding + + self.__fixup_torrent_seeding_page(html) + + def __fixup_torrent_seeding_page(self, html): + """ + 修正种子页面链接 + :param html: + :return: + """ + # 单独的种子页面 + seeding_url_text = html.xpath('//a[contains(@href,"getusertorrentlist.php") ' + 'and contains(@href,"seeding")]/@href') + if seeding_url_text: + self._torrent_seeding_page = seeding_url_text[0].strip() + # 从JS调用种获取用户ID + seeding_url_text = html.xpath('//a[contains(@href, "javascript: getusertorrentlistajax") ' + 'and contains(@href,"seeding")]/@href') + csrf_text = html.xpath('//meta[@name="x-csrf"]/@content') + if not self._torrent_seeding_page and seeding_url_text: + user_js = re.search(r"javascript: getusertorrentlistajax\(\s*'(\d+)", seeding_url_text[0]) + if user_js and user_js.group(1).strip(): + self.userid = user_js.group(1).strip() + self._torrent_seeding_page = f"getusertorrentlistajax.php?userid={self.userid}&type=seeding" + elif seeding_url_text and csrf_text: + if csrf_text[0].strip(): + self._torrent_seeding_page \ + = f"ajax_getusertorrentlist.php" + self._torrent_seeding_params = {'userid': self.userid, 'type': 'seeding', 'csrf': csrf_text[0].strip()} + + # 分类做种模式 + # 临时屏蔽 + # seeding_url_text = html.xpath('//tr/td[text()="当前做种"]/following-sibling::td[1]' + # '/table//td/a[contains(@href,"seeding")]/@href') + # if seeding_url_text: + # self._torrent_seeding_page = seeding_url_text + + def __get_user_level(self, html): + # 等级 获取同一行等级数据,图片格式等级,取title信息,否则取文本信息 + user_levels_text = html.xpath('//tr/td[text()="等級" or text()="等级" or *[text()="等级"]]/' + 'following-sibling::td[1]/img[1]/@title') + if user_levels_text: + self.user_level = user_levels_text[0].strip() + return + + user_levels_text = html.xpath('//tr/td[text()="等級" or text()="等级"]/' + 'following-sibling::td[1 and not(img)]' + '|//tr/td[text()="等級" or text()="等级"]/' + 'following-sibling::td[1 and img[not(@title)]]') + if user_levels_text: + self.user_level = user_levels_text[0].xpath("string(.)").strip() + return + + user_levels_text = html.xpath('//tr/td[text()="等級" or text()="等级"]/' + 'following-sibling::td[1]') + if user_levels_text: + self.user_level = user_levels_text[0].xpath("string(.)").strip() + return + + user_levels_text = html.xpath('//a[contains(@href, "userdetails")]/text()') + if not self.user_level and user_levels_text: + for user_level_text in user_levels_text: + user_level_match = re.search(r"\[(.*)]", user_level_text) + if user_level_match and user_level_match.group(1).strip(): + self.user_level = user_level_match.group(1).strip() + break + + def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]: + html = etree.HTML(html_text) + if not html: + return None + + message_links = html.xpath('//tr[not(./td/img[@alt="Read"])]/td/a[contains(@href, "viewmessage")]/@href') + msg_links.extend(message_links) + # 是否存在下页数据 + next_page = None + next_page_text = html.xpath('//a[contains(.//text(), "下一页") or contains(.//text(), "下一頁")]/@href') + if next_page_text: + next_page = next_page_text[-1].strip() + + return next_page + + def _parse_message_content(self, html_text): + html = etree.HTML(html_text) + if not html: + return None, None, None + # 标题 + message_head_text = None + message_head = html.xpath('//h1/text()' + '|//div[@class="layui-card-header"]/span[1]/text()') + if message_head: + message_head_text = message_head[-1].strip() + + # 消息时间 + message_date_text = None + message_date = html.xpath('//h1/following-sibling::table[.//tr/td[@class="colhead"]]//tr[2]/td[2]' + '|//div[@class="layui-card-header"]/span[2]/span[2]') + if message_date: + message_date_text = message_date[0].xpath("string(.)").strip() + + # 消息内容 + message_content_text = None + message_content = html.xpath('//h1/following-sibling::table[.//tr/td[@class="colhead"]]//tr[3]/td' + '|//div[contains(@class,"layui-card-body")]') + if message_content: + message_content_text = message_content[0].xpath("string(.)").strip() + + return message_head_text, message_date_text, message_content_text + + def __fixup_traffic_info(self, html): + # fixup bonus + if not self.bonus: + bonus_text = html.xpath('//tr/td[text()="魔力值" or text()="猫粮"]/following-sibling::td[1]/text()') + if bonus_text: + self.bonus = StringUtils.str_float(bonus_text[0].strip()) diff --git a/app/plugins/sitestatistics/siteuserinfo/nexus_project.py b/app/plugins/sitestatistics/siteuserinfo/nexus_project.py new file mode 100644 index 00000000..54c49fe5 --- /dev/null +++ b/app/plugins/sitestatistics/siteuserinfo/nexus_project.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +import re + +from app.plugins.sitestatistics.siteuserinfo import SITE_BASE_ORDER +from app.plugins.sitestatistics.siteuserinfo.nexus_php import NexusPhpSiteUserInfo +from app.utils.types import SiteSchema + + +class NexusProjectSiteUserInfo(NexusPhpSiteUserInfo): + schema = SiteSchema.NexusProject + order = SITE_BASE_ORDER + 25 + + @classmethod + def match(cls, html_text: str) -> bool: + return 'Nexus Project' in html_text + + def _parse_site_page(self, html_text: str): + html_text = self._prepare_html_text(html_text) + + user_detail = re.search(r"userdetails.php\?id=(\d+)", html_text) + if user_detail and user_detail.group().strip(): + self._user_detail_page = user_detail.group().strip().lstrip('/') + self.userid = user_detail.group(1) + + self._torrent_seeding_page = f"viewusertorrents.php?id={self.userid}&show=seeding" diff --git a/app/plugins/sitestatistics/siteuserinfo/nexus_rabbit.py b/app/plugins/sitestatistics/siteuserinfo/nexus_rabbit.py new file mode 100644 index 00000000..07f865ef --- /dev/null +++ b/app/plugins/sitestatistics/siteuserinfo/nexus_rabbit.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +import json +from typing import Optional + +from lxml import etree + +from app.log import logger +from app.plugins.sitestatistics.siteuserinfo import SITE_BASE_ORDER +from app.plugins.sitestatistics.siteuserinfo.nexus_php import NexusPhpSiteUserInfo +from app.utils.types import SiteSchema + + +class NexusRabbitSiteUserInfo(NexusPhpSiteUserInfo): + schema = SiteSchema.NexusRabbit + order = SITE_BASE_ORDER + 5 + + @classmethod + def match(cls, html_text: str) -> bool: + html = etree.HTML(html_text) + if not html: + return False + + printable_text = html.xpath("string(.)") if html else "" + return 'Style by Rabbit' in printable_text + + def _parse_site_page(self, html_text: str): + super()._parse_site_page(html_text) + self._torrent_seeding_page = f"getusertorrentlistajax.php?page=1&limit=5000000&type=seeding&uid={self.userid}" + self._torrent_seeding_headers = {"Accept": "application/json, text/javascript, */*; q=0.01"} + + def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]: + """ + 做种相关信息 + :param html_text: + :param multi_page: 是否多页数据 + :return: 下页地址 + """ + + try: + torrents = json.loads(html_text).get('data') + except Exception as e: + logger.error(f"解析做种信息失败: {e}") + return + + page_seeding_size = 0 + page_seeding_info = [] + + page_seeding = len(torrents) + for torrent in torrents: + seeders = int(torrent.get('seeders', 0)) + size = int(torrent.get('size', 0)) + page_seeding_size += int(torrent.get('size', 0)) + + page_seeding_info.append([seeders, size]) + + self.seeding += page_seeding + self.seeding_size += page_seeding_size + self.seeding_info.extend(page_seeding_info) diff --git a/app/plugins/sitestatistics/siteuserinfo/small_horse.py b/app/plugins/sitestatistics/siteuserinfo/small_horse.py new file mode 100644 index 00000000..5a4cf8ff --- /dev/null +++ b/app/plugins/sitestatistics/siteuserinfo/small_horse.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +import re +from typing import Optional + +from lxml import etree + +from app.plugins.sitestatistics.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER +from app.utils.string import StringUtils +from app.utils.types import SiteSchema + + +class SmallHorseSiteUserInfo(ISiteUserInfo): + schema = SiteSchema.SmallHorse + order = SITE_BASE_ORDER + 30 + + @classmethod + def match(cls, html_text: str) -> bool: + return 'Small Horse' in html_text + + def _parse_site_page(self, html_text: str): + html_text = self._prepare_html_text(html_text) + + user_detail = re.search(r"user.php\?id=(\d+)", html_text) + if user_detail and user_detail.group().strip(): + self._user_detail_page = user_detail.group().strip().lstrip('/') + self.userid = user_detail.group(1) + self._torrent_seeding_page = f"torrents.php?type=seeding&userid={self.userid}" + self._user_traffic_page = f"user.php?id={self.userid}" + + def _parse_user_base_info(self, html_text: str): + html_text = self._prepare_html_text(html_text) + html = etree.HTML(html_text) + ret = html.xpath('//a[contains(@href, "user.php")]//text()') + if ret: + self.username = str(ret[0]) + + def _parse_user_traffic_info(self, html_text: str): + """ + 上传/下载/分享率 [做种数/魔力值] + :param html_text: + :return: + """ + html_text = self._prepare_html_text(html_text) + html = etree.HTML(html_text) + tmps = html.xpath('//ul[@class = "stats nobullet"]') + if tmps: + if tmps[1].xpath("li") and tmps[1].xpath("li")[0].xpath("span//text()"): + self.join_at = StringUtils.unify_datetime_str(tmps[1].xpath("li")[0].xpath("span//text()")[0]) + self.upload = StringUtils.num_filesize(str(tmps[1].xpath("li")[2].xpath("text()")[0]).split(":")[1].strip()) + self.download = StringUtils.num_filesize( + str(tmps[1].xpath("li")[3].xpath("text()")[0]).split(":")[1].strip()) + if tmps[1].xpath("li")[4].xpath("span//text()"): + self.ratio = StringUtils.str_float(str(tmps[1].xpath("li")[4].xpath("span//text()")[0]).replace('∞', '0')) + else: + self.ratio = StringUtils.str_float(str(tmps[1].xpath("li")[5].xpath("text()")[0]).split(":")[1]) + self.bonus = StringUtils.str_float(str(tmps[1].xpath("li")[5].xpath("text()")[0]).split(":")[1]) + self.user_level = str(tmps[3].xpath("li")[0].xpath("text()")[0]).split(":")[1].strip() + self.leeching = StringUtils.str_int( + (tmps[4].xpath("li")[6].xpath("text()")[0]).split(":")[1].replace("[", "")) + + def _parse_user_detail_info(self, html_text: str): + pass + + def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]: + """ + 做种相关信息 + :param html_text: + :param multi_page: 是否多页数据 + :return: 下页地址 + """ + html = etree.HTML(html_text) + if not html: + return None + + size_col = 6 + seeders_col = 8 + + page_seeding = 0 + page_seeding_size = 0 + page_seeding_info = [] + seeding_sizes = html.xpath(f'//table[@id="torrent_table"]//tr[position()>1]/td[{size_col}]') + seeding_seeders = html.xpath(f'//table[@id="torrent_table"]//tr[position()>1]/td[{seeders_col}]') + if seeding_sizes and seeding_seeders: + page_seeding = len(seeding_sizes) + + for i in range(0, len(seeding_sizes)): + size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip()) + seeders = StringUtils.str_int(seeding_seeders[i].xpath("string(.)").strip()) + + page_seeding_size += size + page_seeding_info.append([seeders, size]) + + self.seeding += page_seeding + self.seeding_size += page_seeding_size + self.seeding_info.extend(page_seeding_info) + + # 是否存在下页数据 + next_page = None + next_pages = html.xpath('//ul[@class="pagination"]/li[contains(@class,"active")]/following-sibling::li') + if next_pages and len(next_pages) > 1: + page_num = next_pages[0].xpath("string(.)").strip() + if page_num.isdigit(): + next_page = f"{self._torrent_seeding_page}&page={page_num}" + + return next_page + + def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]: + return None + + def _parse_message_content(self, html_text): + return None, None, None diff --git a/app/plugins/sitestatistics/siteuserinfo/tnode.py b/app/plugins/sitestatistics/siteuserinfo/tnode.py new file mode 100644 index 00000000..3ca99e3f --- /dev/null +++ b/app/plugins/sitestatistics/siteuserinfo/tnode.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +import json +import re +from typing import Optional + +from app.plugins.sitestatistics.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER +from app.utils.string import StringUtils +from app.utils.types import SiteSchema + + +class TNodeSiteUserInfo(ISiteUserInfo): + schema = SiteSchema.TNode + order = SITE_BASE_ORDER + 60 + + @classmethod + def match(cls, html_text: str) -> bool: + return 'Powered By TNode' in html_text + + def _parse_site_page(self, html_text: str): + html_text = self._prepare_html_text(html_text) + + # + csrf_token = re.search(r'', html_text) + if csrf_token: + self._addition_headers = {'X-CSRF-TOKEN': csrf_token.group(1)} + self._user_detail_page = "api/user/getMainInfo" + self._torrent_seeding_page = "api/user/listTorrentActivity?id=&type=seeding&page=1&size=20000" + + def _parse_logged_in(self, html_text): + """ + 判断是否登录成功, 通过判断是否存在用户信息 + 暂时跳过检测,待后续优化 + :param html_text: + :return: + """ + return True + + def _parse_user_base_info(self, html_text: str): + self.username = self.userid + + def _parse_user_traffic_info(self, html_text: str): + pass + + def _parse_user_detail_info(self, html_text: str): + detail = json.loads(html_text) + if detail.get("status") != 200: + return + + user_info = detail.get("data", {}) + self.userid = user_info.get("id") + self.username = user_info.get("username") + self.user_level = user_info.get("class", {}).get("name") + self.join_at = user_info.get("regTime", 0) + self.join_at = StringUtils.unify_datetime_str(str(self.join_at)) + + self.upload = user_info.get("upload") + self.download = user_info.get("download") + self.ratio = 0 if self.download <= 0 else round(self.upload / self.download, 3) + self.bonus = user_info.get("bonus") + + self.message_unread = user_info.get("unreadAdmin", 0) + user_info.get("unreadInbox", 0) + user_info.get( + "unreadSystem", 0) + pass + + def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]: + """ + 解析用户做种信息 + """ + seeding_info = json.loads(html_text) + if seeding_info.get("status") != 200: + return + + torrents = seeding_info.get("data", {}).get("torrents", []) + + page_seeding_size = 0 + page_seeding_info = [] + for torrent in torrents: + size = torrent.get("size", 0) + seeders = torrent.get("seeding", 0) + + page_seeding_size += size + page_seeding_info.append([seeders, size]) + + self.seeding += len(torrents) + self.seeding_size += page_seeding_size + self.seeding_info.extend(page_seeding_info) + + # 是否存在下页数据 + next_page = None + + return next_page + + def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]: + return None + + def _parse_message_content(self, html_text): + """ + 系统信息 api/message/listSystem?page=1&size=20 + 收件箱信息 api/message/listInbox?page=1&size=20 + 管理员信息 api/message/listAdmin?page=1&size=20 + :param html_text: + :return: + """ + return None, None, None diff --git a/app/plugins/sitestatistics/siteuserinfo/torrent_leech.py b/app/plugins/sitestatistics/siteuserinfo/torrent_leech.py new file mode 100644 index 00000000..72431d13 --- /dev/null +++ b/app/plugins/sitestatistics/siteuserinfo/torrent_leech.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +import re +from typing import Optional + +from lxml import etree + +from app.plugins.sitestatistics.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER +from app.utils.string import StringUtils +from app.utils.types import SiteSchema + + +class TorrentLeechSiteUserInfo(ISiteUserInfo): + schema = SiteSchema.TorrentLeech + order = SITE_BASE_ORDER + 40 + + @classmethod + def match(cls, html_text: str) -> bool: + return 'TorrentLeech' in html_text + + def _parse_site_page(self, html_text: str): + html_text = self._prepare_html_text(html_text) + + user_detail = re.search(r"/profile/([^/]+)/", html_text) + if user_detail and user_detail.group().strip(): + self._user_detail_page = user_detail.group().strip().lstrip('/') + self.userid = user_detail.group(1) + self._user_traffic_page = f"profile/{self.userid}/view" + self._torrent_seeding_page = f"profile/{self.userid}/seeding" + + def _parse_user_base_info(self, html_text: str): + self.username = self.userid + + def _parse_user_traffic_info(self, html_text: str): + """ + 上传/下载/分享率 [做种数/魔力值] + :param html_text: + :return: + """ + html_text = self._prepare_html_text(html_text) + html = etree.HTML(html_text) + upload_html = html.xpath('//div[contains(@class,"profile-uploaded")]//span/text()') + if upload_html: + self.upload = StringUtils.num_filesize(upload_html[0]) + download_html = html.xpath('//div[contains(@class,"profile-downloaded")]//span/text()') + if download_html: + self.download = StringUtils.num_filesize(download_html[0]) + ratio_html = html.xpath('//div[contains(@class,"profile-ratio")]//span/text()') + if ratio_html: + self.ratio = StringUtils.str_float(ratio_html[0].replace('∞', '0')) + + user_level_html = html.xpath('//table[contains(@class, "profileViewTable")]' + '//tr/td[text()="Class"]/following-sibling::td/text()') + if user_level_html: + self.user_level = user_level_html[0].strip() + + join_at_html = html.xpath('//table[contains(@class, "profileViewTable")]' + '//tr/td[text()="Registration date"]/following-sibling::td/text()') + if join_at_html: + self.join_at = StringUtils.unify_datetime_str(join_at_html[0].strip()) + + bonus_html = html.xpath('//span[contains(@class, "total-TL-points")]/text()') + if bonus_html: + self.bonus = StringUtils.str_float(bonus_html[0].strip()) + + def _parse_user_detail_info(self, html_text: str): + pass + + def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]: + """ + 做种相关信息 + :param html_text: + :param multi_page: 是否多页数据 + :return: 下页地址 + """ + html = etree.HTML(html_text) + if not html: + return None + + size_col = 2 + seeders_col = 7 + + page_seeding = 0 + page_seeding_size = 0 + page_seeding_info = [] + seeding_sizes = html.xpath(f'//tbody/tr/td[{size_col}]') + seeding_seeders = html.xpath(f'//tbody/tr/td[{seeders_col}]/text()') + if seeding_sizes and seeding_seeders: + page_seeding = len(seeding_sizes) + + for i in range(0, len(seeding_sizes)): + size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip()) + seeders = StringUtils.str_int(seeding_seeders[i]) + + page_seeding_size += size + page_seeding_info.append([seeders, size]) + + self.seeding += page_seeding + self.seeding_size += page_seeding_size + self.seeding_info.extend(page_seeding_info) + + # 是否存在下页数据 + next_page = None + + return next_page + + def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]: + return None + + def _parse_message_content(self, html_text): + return None, None, None diff --git a/app/plugins/sitestatistics/siteuserinfo/unit3d.py b/app/plugins/sitestatistics/siteuserinfo/unit3d.py new file mode 100644 index 00000000..a03430ac --- /dev/null +++ b/app/plugins/sitestatistics/siteuserinfo/unit3d.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +import re +from typing import Optional + +from lxml import etree + +from app.plugins.sitestatistics.siteuserinfo import ISiteUserInfo, SITE_BASE_ORDER +from app.utils.string import StringUtils +from app.utils.types import SiteSchema + + +class Unit3dSiteUserInfo(ISiteUserInfo): + schema = SiteSchema.Unit3d + order = SITE_BASE_ORDER + 15 + + @classmethod + def match(cls, html_text: str) -> bool: + return "unit3d.js" in html_text + + def _parse_user_base_info(self, html_text: str): + html_text = self._prepare_html_text(html_text) + html = etree.HTML(html_text) + + tmps = html.xpath('//a[contains(@href, "/users/") and contains(@href, "settings")]/@href') + if tmps: + user_name_match = re.search(r"/users/(.+)/settings", tmps[0]) + if user_name_match and user_name_match.group().strip(): + self.username = user_name_match.group(1) + self._torrent_seeding_page = f"/users/{self.username}/active?perPage=100&client=&seeding=include" + self._user_detail_page = f"/users/{self.username}" + + tmps = html.xpath('//a[contains(@href, "bonus/earnings")]') + if tmps: + bonus_text = tmps[0].xpath("string(.)") + bonus_match = re.search(r"([\d,.]+)", bonus_text) + if bonus_match and bonus_match.group(1).strip(): + self.bonus = StringUtils.str_float(bonus_match.group(1)) + + def _parse_site_page(self, html_text: str): + # TODO + pass + + def _parse_user_detail_info(self, html_text: str): + """ + 解析用户额外信息,加入时间,等级 + :param html_text: + :return: + """ + html = etree.HTML(html_text) + if not html: + return None + + # 用户等级 + user_levels_text = html.xpath('//div[contains(@class, "content")]//span[contains(@class, "badge-user")]/text()') + if user_levels_text: + self.user_level = user_levels_text[0].strip() + + # 加入日期 + join_at_text = html.xpath('//div[contains(@class, "content")]//h4[contains(text(), "注册日期") ' + 'or contains(text(), "註冊日期") ' + 'or contains(text(), "Registration date")]/text()') + if join_at_text: + self.join_at = StringUtils.unify_datetime_str( + join_at_text[0].replace('注册日期', '').replace('註冊日期', '').replace('Registration date', '')) + + def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]: + """ + 做种相关信息 + :param html_text: + :param multi_page: 是否多页数据 + :return: 下页地址 + """ + html = etree.HTML(html_text) + if not html: + return None + + size_col = 9 + seeders_col = 2 + # 搜索size列 + if html.xpath('//thead//th[contains(@class,"size")]'): + size_col = len(html.xpath('//thead//th[contains(@class,"size")][1]/preceding-sibling::th')) + 1 + # 搜索seeders列 + if html.xpath('//thead//th[contains(@class,"seeders")]'): + seeders_col = len(html.xpath('//thead//th[contains(@class,"seeders")]/preceding-sibling::th')) + 1 + + page_seeding = 0 + page_seeding_size = 0 + page_seeding_info = [] + seeding_sizes = html.xpath(f'//tr[position()]/td[{size_col}]') + seeding_seeders = html.xpath(f'//tr[position()]/td[{seeders_col}]') + if seeding_sizes and seeding_seeders: + page_seeding = len(seeding_sizes) + + for i in range(0, len(seeding_sizes)): + size = StringUtils.num_filesize(seeding_sizes[i].xpath("string(.)").strip()) + seeders = StringUtils.str_int(seeding_seeders[i].xpath("string(.)").strip()) + + page_seeding_size += size + page_seeding_info.append([seeders, size]) + + self.seeding += page_seeding + self.seeding_size += page_seeding_size + self.seeding_info.extend(page_seeding_info) + + # 是否存在下页数据 + next_page = None + next_pages = html.xpath('//ul[@class="pagination"]/li[contains(@class,"active")]/following-sibling::li') + if next_pages and len(next_pages) > 1: + page_num = next_pages[0].xpath("string(.)").strip() + if page_num.isdigit(): + next_page = f"{self._torrent_seeding_page}&page={page_num}" + + return next_page + + def _parse_user_traffic_info(self, html_text: str): + html_text = self._prepare_html_text(html_text) + upload_match = re.search(r"[^总]上[传傳]量?[::_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text, + re.IGNORECASE) + self.upload = StringUtils.num_filesize(upload_match.group(1).strip()) if upload_match else 0 + download_match = re.search(r"[^总子影力]下[载載]量?[::_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+[KMGTPI]*B)", html_text, + re.IGNORECASE) + self.download = StringUtils.num_filesize(download_match.group(1).strip()) if download_match else 0 + ratio_match = re.search(r"分享率[::_<>/a-zA-Z-=\"'\s#;]+([\d,.\s]+)", html_text) + self.ratio = StringUtils.str_float(ratio_match.group(1)) if ( + ratio_match and ratio_match.group(1).strip()) else 0.0 + + def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]: + return None + + def _parse_message_content(self, html_text): + return None, None, None diff --git a/app/schemas/site.py b/app/schemas/site.py index 22caac9a..b264cc0e 100644 --- a/app/schemas/site.py +++ b/app/schemas/site.py @@ -12,6 +12,7 @@ class Site(BaseModel): rss: Optional[str] = None cookie: Optional[str] = None ua: Optional[str] = None + proxy: Optional[int] = 0 filter: Optional[str] = None note: Optional[str] = None limit_interval: Optional[int] = 0 diff --git a/app/utils/types.py b/app/utils/types.py index 0cbee166..f2296cab 100644 --- a/app/utils/types.py +++ b/app/utils/types.py @@ -17,9 +17,26 @@ class EventType(Enum): PluginReload = "plugin.reload" # 执行命令 CommandExcute = "command.excute" + # 站点签到 + SiteSignin = "site.signin" # 系统配置Key字典 class SystemConfigKey(Enum): # 用户已安装的插件 UserInstalledPlugins = "UserInstalledPlugins" + + +# 站点框架 +class SiteSchema(Enum): + DiscuzX = "Discuz!" + Gazelle = "Gazelle" + Ipt = "IPTorrents" + NexusPhp = "NexusPhp" + NexusProject = "NexusProject" + NexusRabbit = "NexusRabbit" + SmallHorse = "Small Horse" + Unit3d = "Unit3d" + TorrentLeech = "TorrentLeech" + FileList = "FileList" + TNode = "TNode" diff --git a/requirements.txt b/requirements.txt index 61cb5516..c8285de2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +Cython~=0.29.35 fast-bencode~=1.1.3 pydantic~=1.10.8 SQLAlchemy~=2.0.15 @@ -29,4 +30,9 @@ plexapi~=4.14.0 transmission-rpc~=4.3.0 feapder~=1.8.5 Jinja2~=3.1.2 -pyparsing~=3.0.9 \ No newline at end of file +pyparsing~=3.0.9 +func_timeout==4.3.5 +selenium~=4.9.1 +bs4~=0.0.1 +beautifulsoup4~=4.12.2 +pillow==9.5.0 \ No newline at end of file