From d4c28500b7c7473392adf5a138ef8c9b4ab7737d Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 7 Sep 2023 22:04:07 +0800 Subject: [PATCH] fix plugin --- app/plugins/brushflow/__init__.py | 500 +++++++++++++++++++++++++++++- 1 file changed, 492 insertions(+), 8 deletions(-) diff --git a/app/plugins/brushflow/__init__.py b/app/plugins/brushflow/__init__.py index 62a14082..0b831e1f 100644 --- a/app/plugins/brushflow/__init__.py +++ b/app/plugins/brushflow/__init__.py @@ -1,17 +1,28 @@ +import re +import threading +import time from datetime import datetime, timedelta from threading import Event -from typing import Any, List, Dict, Tuple +from typing import Any, List, Dict, Tuple, Optional, Union import pytz from apscheduler.schedulers.background import BackgroundScheduler +from app import schemas +from app.chain.search import SearchChain from app.chain.torrents import TorrentsChain from app.core.config import settings +from app.db.site_oper import SiteOper from app.helper.sites import SitesHelper from app.log import logger +from app.modules.qbittorrent import Qbittorrent +from app.modules.transmission import Transmission from app.plugins import _PluginBase +from app.schemas import Notification, NotificationType, TorrentInfo from app.utils.string import StringUtils +lock = threading.Lock() + class BrushFlow(_PluginBase): # 插件名称 @@ -36,8 +47,12 @@ class BrushFlow(_PluginBase): auth_level = 3 # 私有属性 - sites = None + siteshelper = None + siteoper = None torrents = None + searchchain = None + qb = None + tr = None # 添加种子定时 _cron = 10 # 检查种子定时 @@ -71,8 +86,10 @@ class BrushFlow(_PluginBase): _save_path = "" def init_plugin(self, config: dict = None): - self.sites = SitesHelper() + self.siteshelper = SitesHelper() + self.siteoper = SiteOper() self.torrents = TorrentsChain() + self.searchchain = SearchChain() if config: self._enabled = config.get("enabled") self._notify = config.get("notify") @@ -104,60 +121,76 @@ class BrushFlow(_PluginBase): # 启动定时任务 & 立即运行一次 if self.get_state() or self._onlyonce: + self.qb = Qbittorrent() + self.tr = Transmission() # 检查配置 if self._disksize and not StringUtils.is_number(self._disksize): + self._disksize = 0 logger.error(f"保种体积设置错误:{self._disksize}") self.systemmessage.put(f"保种体积设置错误:{self._disksize}") return if self._maxupspeed and not StringUtils.is_number(self._maxupspeed): + self._maxupspeed = 0 logger.error(f"总上传带宽设置错误:{self._maxupspeed}") self.systemmessage.put(f"总上传带宽设置错误:{self._maxupspeed}") return if self._maxdlspeed and not StringUtils.is_number(self._maxdlspeed): + self._maxdlspeed = 0 logger.error(f"总下载带宽设置错误:{self._maxdlspeed}") self.systemmessage.put(f"总下载带宽设置错误:{self._maxdlspeed}") return if self._maxdlcount and not StringUtils.is_number(self._maxdlcount): + self._maxdlcount = 0 logger.error(f"同时下载任务数设置错误:{self._maxdlcount}") self.systemmessage.put(f"同时下载任务数设置错误:{self._maxdlcount}") return if self._size and not StringUtils.is_number(self._size): + self._size = 0 logger.error(f"种子大小设置错误:{self._size}") self.systemmessage.put(f"种子大小设置错误:{self._size}") return if self._seeder and not StringUtils.is_number(self._seeder): + self._seeder = 0 logger.error(f"做种人数设置错误:{self._seeder}") self.systemmessage.put(f"做种人数设置错误:{self._seeder}") return if self._seed_time and not StringUtils.is_number(self._seed_time): + self._seed_time = 0 logger.error(f"做种时间设置错误:{self._seed_time}") self.systemmessage.put(f"做种时间设置错误:{self._seed_time}") return if self._seed_ratio and not StringUtils.is_number(self._seed_ratio): + self._seed_ratio = 0 logger.error(f"分享率设置错误:{self._seed_ratio}") self.systemmessage.put(f"分享率设置错误:{self._seed_ratio}") return if self._seed_size and not StringUtils.is_number(self._seed_size): + self._seed_size = 0 logger.error(f"上传量设置错误:{self._seed_size}") self.systemmessage.put(f"上传量设置错误:{self._seed_size}") return if self._download_time and not StringUtils.is_number(self._download_time): + self._download_time = 0 logger.error(f"下载超时时间设置错误:{self._download_time}") self.systemmessage.put(f"下载超时时间设置错误:{self._download_time}") return if self._seed_avgspeed and not StringUtils.is_number(self._seed_avgspeed): + self._seed_avgspeed = 0 logger.error(f"平均上传速度设置错误:{self._seed_avgspeed}") self.systemmessage.put(f"平均上传速度设置错误:{self._seed_avgspeed}") return if self._seed_inactivetime and not StringUtils.is_number(self._seed_inactivetime): + self._seed_inactivetime = 0 logger.error(f"未活动时间设置错误:{self._seed_inactivetime}") self.systemmessage.put(f"未活动时间设置错误:{self._seed_inactivetime}") return if self._up_speed and not StringUtils.is_number(self._up_speed): + self._up_speed = 0 logger.error(f"单任务上传限速设置错误:{self._up_speed}") self.systemmessage.put(f"单任务上传限速设置错误:{self._up_speed}") return if self._dl_speed and not StringUtils.is_number(self._dl_speed): + self._dl_speed = 0 logger.error(f"单任务下载限速设置错误:{self._dl_speed}") self.systemmessage.put(f"单任务下载限速设置错误:{self._dl_speed}") return @@ -232,7 +265,7 @@ class BrushFlow(_PluginBase): """ # 站点的可选项 site_options = [{"title": site.get("name"), "value": site.get("id")} - for site in self.sites.get_indexers()] + for site in self.siteshelper.get_indexers()] return [ { 'component': 'VForm', @@ -705,16 +738,467 @@ class BrushFlow(_PluginBase): """ if not self._brushsites or not self._downloader: return - logger.info(f"开始执行刷流任务 ...") - logger.info(f"刷流任务执行完成") + with lock: + logger.info(f"开始执行刷流任务 ...") + # 读取种子记录 + task_info = self.get_data("torrents") or {} + if task_info: + # 当前保种大小 + torrents_size = sum([task.get("size") or 0 for task in task_info.values()]) + else: + torrents_size = 0 + # 处理所有站点 + for siteid in self._brushsites: + siteinfo = self.siteoper.get(siteid) + if not siteinfo: + logger.warn(f"站点不存在:{siteid}") + continue + logger.info(f"开始获取站点 {siteinfo.name} 的新种子 ...") + torrents = self.searchchain.browse(domain=siteinfo.domain) + if not torrents: + logger.info(f"站点 {siteinfo.name} 没有获取到种子") + continue + # 过滤种子 + for torrent in torrents: + # 保种体积(GB) 促销 + if self._disksize \ + and (torrents_size + torrent.size) > self._size * 1024**3: + logger.warn(f"当前做种体积 {StringUtils.str_filesize(torrents_size)} 已超过保种体积 {self._disksize},停止新增任务") + return + # 促销 + if self._freeleech and torrent.downloadvolumefactor != 0: + continue + if self._freeleech == "2xfree" and torrent.uploadvolumefactor != 2: + continue + # 包含规则 + if self._include and not re.search(r"%s" % self._include, torrent.title, re.I): + continue + # 排除规则 + if self._exclude and re.search(r"%s" % self._exclude, torrent.title, re.I): + continue + # 种子大小(GB) + if self._size: + sizes = str(self._size).split("-") + begin_size = sizes[0] + if len(sizes) > 1: + end_size = sizes[-1] + else: + end_size = 0 + if begin_size and not end_size \ + and torrent.size > float(begin_size) * 1024**3: + continue + elif begin_size and end_size \ + and not float(begin_size) * 1024**3 <= torrent.size <= float(end_size) * 1024**3: + continue + # 做种人数 + if self._seeder: + seeders = str(self._seeder).split("-") + begin_seeder = seeders[0] + if len(seeders) > 1: + end_seeder = seeders[-1] + else: + end_seeder = 0 + if begin_seeder and not end_seeder \ + and torrent.seeders > int(begin_seeder): + continue + elif begin_seeder and end_seeder \ + and not int(begin_seeder) <= torrent.seeders <= int(end_seeder): + continue + # 计算发布时间 + pubdate = StringUtils.get_time(pubdate) + localtz = pytz.timezone(settings.TZ) + localnowtime = datetime.now().astimezone(localtz) + localpubdate = pubdate.astimezone(localtz) + pudate_minutes = int(localnowtime.timestamp() - localpubdate.timestamp()) / 60 + # 发布时间(分钟) + if self._pubtime: + pubtimes = str(self._pubtime).split("-") + begin_pubtime = pubtimes[0] + if len(pubtimes) > 1: + end_pubtime = pubtimes[-1] + else: + end_pubtime = 0 + # 将种子发布日志转换为与当前时间的差 + if begin_pubtime and not end_pubtime \ + and pudate_minutes > int(begin_pubtime) * 60: + continue + elif begin_pubtime and end_pubtime \ + and not int(begin_pubtime) * 60 <= pudate_minutes <= int(end_pubtime) * 60: + continue + # 同时下载任务数 + downloads = self.__get_downloading_count(self._downloader) + if self._maxdlcount and downloads >= self._maxdlcount: + continue + # 获取下载器的下载信息 + downloader_info = self.__get_downloader_info() + if downloader_info: + current_upload_speed = downloader_info.upload_speed or 0 + current_download_speed = downloader_info.download_speed or 0 + # 总上传带宽(KB/s) + if self._maxupspeed \ + and current_upload_speed >= float(self._maxupspeed) * 1024: + continue + # 总下载带宽(KB/s) + if self._maxdlspeed \ + and current_download_speed >= float(self._maxdlspeed) * 1024: + continue + # 添加下载任务 + hash_string = self.__download(torrent=torrent) + if not hash_string: + logger.warn(f"{torrent.title} 添加刷流任务失败!") + continue + # 保存任务信息 + task_info[hash_string] = { + "site_name": torrent.site_name, + "size": torrent.size + } + # 发送消息 + self.__send_add_message(torrent) + + # 保存数据 + self.save_data("torrents", task_info) + logger.info(f"刷流任务执行完成") def check(self): """ 定时检查,删除下载任务 + { + hash: { + site_name: + size: + } + } """ if not self._downloader: return - logger.info(f"开始检查刷流下载任务 ...") - logger.info(f"刷流下载任务检查完成") + with lock: + logger.info(f"开始检查刷流下载任务 ...") + # 读取种子记录 + task_info = self.get_data("torrents") or {} + if not task_info: + logger.info(f"没有需要检查的刷流下载任务") + return + # 种子Hash + check_hashs = list(task_info.keys()) + logger.info(f"共有 {len(check_hashs)} 个任务正在刷流,开始检查任务状态") + # 获取下载器实例 + downloader = self.__get_downloader(self._downloader) + if not downloader: + return + # 获取下载器中的种子 + torrents, state = downloader.get_torrents(ids=check_hashs) + if not state: + logger.warn("连接下载器出错,将在下个时间周期重试") + return + if not torrents: + logger.warn(f"刷流任务在下载器中不存在,清除记录") + self.save_data("hashs", {}) + return + # 检查种子状态,判断是否要删种 + for torrent in torrents: + site_name = task_info.get(self.__get_hash(torrent, self._downloader)).get("site_name") + torrent_info = self.__get_torrent_info(self._downloader, torrent) + # 做种时间(小时) + if self._seed_time: + if torrent_info.get("seeding_time") >= float(self._seed_time) * 3600: + logger.info(f"做种时间达到 {self._seed_time} 小时,删除种子:{torrent_info.get('title')}") + downloader.delete_torrents(ids=torrent_info.get("hash"), delete_file=True) + task_info.pop(torrent_info.get('hash')) + self.__send_delete_message(site_name=site_name, + torrent_title=torrent_info.get("title"), + reason=f"做种时间达到 {self._seed_time} 小时") + continue + # 分享率 + if self._seed_ratio: + if torrent_info.get("ratio") >= float(self._seed_ratio): + logger.info(f"分享率达到 {self._seed_ratio},删除种子:{torrent_info.get('title')}") + downloader.delete_torrents(ids=torrent_info.get("hash"), delete_file=True) + task_info.pop(torrent_info.get('hash')) + self.__send_delete_message(site_name=site_name, + torrent_title=torrent_info.get("title"), + reason=f"分享率达到 {self._seed_ratio}") + continue + # 上传量(GB) + if self._seed_size: + if torrent_info.get("uploaded") >= float(self._seed_size) * 1024 * 1024 * 1024: + logger.info(f"上传量达到 {self._seed_size} GB,删除种子:{torrent_info.get('title')}") + downloader.delete_torrents(ids=torrent_info.get("hash"), delete_file=True) + task_info.pop(torrent_info.get('hash')) + self.__send_delete_message(site_name=site_name, + torrent_title=torrent_info.get("title"), + reason=f"上传量达到 {self._seed_size} GB") + continue + # 下载耗时(小时) + if self._download_time \ + and torrent_info.get("downloaded") < torrent_info.get("total_size"): + if torrent_info.get("dltime") >= float(self._download_time) * 3600: + logger.info(f"下载耗时达到 {self._download_time} 小时,删除种子:{torrent_info.get('title')}") + downloader.delete_torrents(ids=torrent_info.get("hash"), delete_file=True) + task_info.pop(torrent_info.get('hash')) + self.__send_delete_message(site_name=site_name, + torrent_title=torrent_info.get("title"), + reason=f"下载耗时达到 {self._download_time} 小时") + continue + # 平均上传速度(KB / s) + if self._seed_avgspeed: + if torrent_info.get("avg_upspeed") <= float(self._seed_avgspeed) * 1024: + logger.info(f"平均上传速度低于 {self._seed_avgspeed} KB/s,删除种子:{torrent_info.get('title')}") + downloader.delete_torrents(ids=torrent_info.get("hash"), delete_file=True) + task_info.pop(torrent_info.get('hash')) + self.__send_delete_message(site_name=site_name, + torrent_title=torrent_info.get("title"), + reason=f"平均上传速度低于 {self._seed_avgspeed} KB/s") + continue + # 未活动时间(分钟) + if self._seed_inactivetime: + if torrent_info.get("iatime") >= float(self._seed_inactivetime) * 60: + logger.info( + f"未活动时间达到 {self._seed_inactivetime} 分钟,删除种子:{torrent_info.get('title')}") + downloader.delete_torrents(ids=torrent_info.get("hash"), delete_file=True) + task_info.pop(torrent_info.get('hash')) + self.__send_delete_message(site_name=site_name, + torrent_title=torrent_info.get("title"), + reason=f"未活动时间达到 {self._seed_inactivetime} 分钟") + continue + self.save_data("torrents", task_info) + logger.info(f"刷流下载任务检查完成") + + def __get_downloader(self, dtype: str) -> Optional[Union[Transmission, Qbittorrent]]: + """ + 根据类型返回下载器实例 + """ + if dtype == "qbittorrent": + return self.qb + elif dtype == "transmission": + return self.tr + else: + return None + + def __download(self, torrent: TorrentInfo) -> Optional[str]: + """ + 添加下载任务 + """ + if self._downloader == "qbittorrent": + if not self.qb: + return None + # 生成随机Tag + tag = StringUtils.generate_random_str(10) + state = self.qb.add_torrent(content=torrent.enclosure, + download_dir=self._save_path or None, + cookie=torrent.site_cookie, + tag=["已整理", "刷流", tag]) + if not state: + return None + else: + # 获取种子Hash + torrent_hash = self.qb.get_torrent_id_by_tag(tags=tag) + if not torrent_hash: + logger.error(f"{self._downloader} 获取种子Hash失败") + return None + return torrent_hash + elif self._downloader == "transmission": + if not self.tr: + return None + # 添加任务 + torrent = self.tr.add_torrent(content=torrent.enclosure, + download_dir=self._save_path or None, + cookie=torrent.site_cookie, + labels=["已整理", "刷流"]) + if not torrent: + return None + else: + return torrent.hashString + return None + + @staticmethod + def __get_hash(torrent: Any, dl_type: str): + """ + 获取种子hash + """ + try: + return torrent.get("hash") if dl_type == "qbittorrent" else torrent.hashString + except Exception as e: + print(str(e)) + return "" + + @staticmethod + def __get_label(torrent: Any, dl_type: str): + """ + 获取种子标签 + """ + try: + return [str(tag).strip() for tag in torrent.get("tags").split(',')] \ + if dl_type == "qbittorrent" else torrent.labels or [] + except Exception as e: + print(str(e)) + return [] + + @staticmethod + def __get_torrent_info(downloader_type: str, torrent: Any) -> dict: + + # 当前时间戳 + date_now = int(time.time()) + # QB + if downloader_type == "qbittorrent": + # ID + torrent_id = torrent.get("hash") + # 标题 + torrent_title = torrent.get("name") + # 下载时间 + dltime = date_now - torrent.get("added_on") if torrent.get("added_on") else 0 + # 做种时间 + seeding_time = date_now - torrent.get("completion_on") if torrent.get("completion_on") else 0 + # 分享率 + ratio = torrent.get("ratio") or 0 + # 上传量 + uploaded = torrent.get("uploaded") or 0 + # 平均上传速度 Byte/s + if dltime: + avg_upspeed = int(uploaded / dltime) + else: + avg_upspeed = uploaded + # 已未活动 秒 + iatime = date_now - torrent.get("last_activity") if torrent.get("last_activity") else 0 + # 下载量 + downloaded = torrent.get("downloaded") + # 种子大小 + total_size = torrent.get("total_size") + # 添加时间 + add_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(torrent.get("added_on") or 0)) + # TR + else: + # ID + torrent_id = torrent.hashString + # 标题 + torrent_title = torrent.name + # 做种时间 + if not torrent.date_done or torrent.date_done.timestamp() < 1: + seeding_time = 0 + else: + seeding_time = date_now - int(torrent.date_done.timestamp()) + # 下载耗时 + if not torrent.date_added or torrent.date_added.timestamp() < 1: + dltime = 0 + else: + dltime = date_now - int(torrent.date_added.timestamp()) + # 下载量 + downloaded = int(torrent.total_size * torrent.progress / 100) + # 分享率 + ratio = torrent.ratio or 0 + # 上传量 + uploaded = int(downloaded * torrent.ratio) + # 平均上传速度 + if dltime: + avg_upspeed = int(uploaded / dltime) + else: + avg_upspeed = uploaded + # 未活动时间 + if not torrent.date_active or torrent.date_active.timestamp() < 1: + iatime = 0 + else: + iatime = date_now - int(torrent.date_active.timestamp()) + # 种子大小 + total_size = torrent.total_size + # 添加时间 + add_time = time.strftime('%Y-%m-%d %H:%M:%S', + time.localtime(torrent.date_added.timestamp() if torrent.date_added else 0)) + + return { + "hash": torrent_id, + "title": torrent_title, + "seeding_time": seeding_time, + "ratio": ratio, + "uploaded": uploaded, + "downloaded": downloaded, + "avg_upspeed": avg_upspeed, + "iatime": iatime, + "dltime": dltime, + "total_size": total_size, + "add_time": add_time + } + + def __send_delete_message(self, site_name: str, torrent_title: str, reason: str): + """ + 发送删除种子的消息 + """ + if self._notify: + self.chain.post_message(Notification( + mtype=NotificationType.SiteMessage, + title=f"【刷流任务删种】", + text=f"站点:{site_name}\n" + f"标题:{torrent_title}\n" + f"原因:{reason}" + )) + + def __send_add_message(self, torrent: TorrentInfo): + """ + 发送添加下载的消息 + """ + msg_text = "" + if torrent.site_name: + msg_text = f"站点:{torrent.site_name}" + if torrent.title: + msg_text = f"{msg_text}\n标题:{torrent.title}" + if torrent.size: + if str(torrent.size).replace(".", "").isdigit(): + size = StringUtils.str_filesize(torrent.size) + else: + size = torrent.size + msg_text = f"{msg_text}\n大小:{size}" + if torrent.seeders: + msg_text = f"{msg_text}\n做种数:{torrent.seeders}" + if torrent.volume_factor: + msg_text = f"{msg_text}\n促销:{torrent.volume_factor}" + if torrent.hit_and_run: + msg_text = f"{msg_text}\nHit&Run:是" + + self.chain.post_message(Notification( + mtype=NotificationType.SiteMessage, + title="【刷流任务种子下载】", + text=msg_text + )) + + def __get_torrents_size(self) -> int: + """ + 获取任务中的种子总大小 + """ + # 读取种子记录 + task_info = self.get_data("torrents") or {} + if not task_info: + return 0 + total_size = sum([task.get("size") or 0 for task in task_info.values()]) + return total_size + + def __get_downloader_info(self) -> schemas.DownloaderInfo: + """ + 获取下载器实时信息 + """ + if self._downloader == "qbittorrent": + # 调用Qbittorrent API查询实时信息 + info = self.qb.transfer_info() + return schemas.DownloaderInfo( + download_speed=info.get("dl_info_speed"), + upload_speed=info.get("up_info_speed"), + download_size=info.get("dl_info_data"), + upload_size=info.get("up_info_data") + ) + else: + info = self.tr.transfer_info() + return schemas.DownloaderInfo( + download_speed=info.download_speed, + upload_speed=info.upload_speed, + download_size=info.current_stats.downloaded_bytes, + upload_size=info.current_stats.uploaded_bytes + ) + + def __get_downloading_count(self, dltype: str) -> int: + """ + 获取正在下载的任务数量 + """ + downlader = self.__get_downloader(dltype) + if not downlader: + return 0 + torrents = downlader.get_downloading_torrents() + return len(torrents) or 0