add plugins

This commit is contained in:
jxxghp 2023-08-07 20:40:07 +08:00
parent d682c5620a
commit 3e9091f0ef
3 changed files with 1381 additions and 0 deletions

View File

@ -0,0 +1,701 @@
import re
from datetime import datetime, timedelta
from threading import Event
from typing import Any, List, Dict, Tuple
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from lxml import etree
from app.core.config import settings
from app.log import logger
from app.plugins import _PluginBase
from app.plugins.iyuuautoseed.iyuu_helper import IyuuHelper
from app.utils.http import RequestUtils
class IYUUAutoSeed(_PluginBase):
# 插件名称
plugin_name = "IYUU自动辅种"
# 插件描述
plugin_desc = "基于IYUU官方Api实现自动辅种。"
# 插件图标
plugin_icon = "iyuu.png"
# 主题色
plugin_color = "#F3B70B"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "jxxghp"
# 作者主页
author_url = "https://github.com/jxxghp"
# 插件配置项ID前缀
plugin_config_prefix = "iyuuautoseed_"
# 加载顺序
plugin_order = 17
# 可使用的用户级别
auth_level = 2
# 私有属性
_scheduler = None
iyuuhelper = None
sites = None
# 开关
_enabled = False
_cron = None
_onlyonce = False
_token = None
_downloaders = []
_sites = []
_notify = False
_nolabels = None
_clearcache = False
# 退出事件
_event = Event()
# 种子链接xpaths
_torrent_xpaths = [
"//form[contains(@action, 'download.php?id=')]/@action",
"//a[contains(@href, 'download.php?hash=')]/@href",
"//a[contains(@href, 'download.php?id=')]/@href",
"//a[@class='index'][contains(@href, '/dl/')]/@href",
]
_torrent_tags = ["已整理", "辅种"]
# 待校全种子hash清单
_recheck_torrents = {}
_is_recheck_running = False
# 辅种缓存,出错的种子不再重复辅种,可清除
_error_caches = []
# 辅种缓存,辅种成功的种子,可清除
_success_caches = []
# 辅种缓存出错的种子不再重复辅种且无法清除。种子被删除404等情况
_permanent_error_caches = []
# 辅种计数
total = 0
realtotal = 0
success = 0
exist = 0
fail = 0
cached = 0
def init_plugin(self, config: dict = None):
# 读取配置
if config:
self._enabled = config.get("enabled")
self._onlyonce = config.get("onlyonce")
self._cron = config.get("cron")
self._token = config.get("token")
self._downloaders = config.get("downloaders")
self._sites = config.get("sites")
self._notify = config.get("notify")
self._nolabels = config.get("nolabels")
self._clearcache = config.get("clearcache")
self._permanent_error_caches = config.get("permanent_error_caches") or []
self._error_caches = [] if self._clearcache else config.get("error_caches") or []
self._success_caches = [] if self._clearcache else config.get("success_caches") or []
# 停止现有任务
self.stop_service()
# 启动定时任务 & 立即运行一次
if self.get_state() or self._onlyonce:
self.iyuuhelper = IyuuHelper(token=self._token)
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
if self._cron:
try:
self._scheduler.add_job(self.auto_seed,
CronTrigger.from_crontab(self._cron))
logger.info(f"辅种服务启动,周期:{self._cron}")
except Exception as err:
logger.error(f"辅种服务启动失败:{str(err)}")
self.systemmessage.put(f"辅种服务启动失败:{str(err)}")
if self._onlyonce:
logger.info(f"辅种服务启动,立即运行一次")
self._scheduler.add_job(self.auto_seed, 'date',
run_date=datetime.now(
tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3)
)
# 关闭一次性开关
self._onlyonce = False
if self._clearcache:
# 关闭清除缓存开关
self._clearcache = False
if self._clearcache or self._onlyonce:
# 保存配置
self.__update_config()
if self._scheduler.get_jobs():
# 追加种子校验服务
self._scheduler.add_job(self.check_recheck, 'interval', minutes=3)
# 启动服务
self._scheduler.print_jobs()
self._scheduler.start()
def get_state(self) -> bool:
return True if self._enabled and self._cron and self._token and self._downloaders else False
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1页面配置2数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'notify',
'label': '发送通知',
}
}
]
}
]
}
]
}
], {
"enable": False,
"onlyonce": False,
"notify": False,
"clearcache": False,
"cron": "",
"token": "",
"downloaders": [],
"sites": [],
"nolabels": ""
}
def get_page(self) -> List[dict]:
pass
def __update_config(self):
self.update_config({
"enable": self._enabled,
"onlyonce": self._onlyonce,
"clearcache": self._clearcache,
"cron": self._cron,
"token": self._token,
"downloaders": self._downloaders,
"sites": self._sites,
"notify": self._notify,
"nolabels": self._nolabels,
"success_caches": self._success_caches,
"error_caches": self._error_caches,
"permanent_error_caches": self._permanent_error_caches
})
def auto_seed(self):
"""
开始辅种
"""
if not self._enabled or not self._token or not self._downloaders:
logger.warn("辅种服务未启用或未配置")
return
if not self.iyuuhelper:
return
logger.info("开始辅种任务 ...")
# 计数器初始化
self.total = 0
self.realtotal = 0
self.success = 0
self.exist = 0
self.fail = 0
self.cached = 0
# 扫描下载器辅种
for downloader in self._downloaders:
logger.info(f"开始扫描下载器 {downloader} ...")
# TODO 获取下载器中已完成的种子
torrents = []
if torrents:
logger.info(f"下载器 {downloader} 已完成种子数:{len(torrents)}")
else:
logger.info(f"下载器 {downloader} 没有已完成种子")
continue
hash_strs = []
for torrent in torrents:
if self._event.is_set():
logger.info(f"辅种服务停止")
return
# 获取种子hash
hash_str = self.__get_hash(torrent, downloader)
if hash_str in self._error_caches or hash_str in self._permanent_error_caches:
logger.info(f"种子 {hash_str} 辅种失败且已缓存,跳过 ...")
continue
save_path = self.__get_save_path(torrent, downloader)
# 获取种子标签
torrent_labels = self.__get_label(torrent, downloader)
if torrent_labels and self._nolabels:
is_skip = False
for label in self._nolabels.split(','):
if label in torrent_labels:
logger.info(f"种子 {hash_str} 含有不转移标签 {label},跳过 ...")
is_skip = True
break
if is_skip:
continue
hash_strs.append({
"hash": hash_str,
"save_path": save_path
})
if hash_strs:
logger.info(f"总共需要辅种的种子数:{len(hash_strs)}")
# 分组处理减少IYUU Api请求次数
chunk_size = 200
for i in range(0, len(hash_strs), chunk_size):
# 切片操作
chunk = hash_strs[i:i + chunk_size]
# 处理分组
self.__seed_torrents(hash_strs=chunk,
downloader=downloader)
# 触发校验检查
self.check_recheck()
else:
logger.info(f"没有需要辅种的种子")
# 保存缓存
self.__update_config()
# 发送消息
if self._notify:
if self.success or self.fail:
self.post_message(
title="【IYUU自动辅种任务完成】",
text=f"服务器返回可辅种总数:{self.total}\n"
f"实际可辅种数:{self.realtotal}\n"
f"已存在:{self.exist}\n"
f"成功:{self.success}\n"
f"失败:{self.fail}\n"
f"{self.cached} 条失败记录已加入缓存"
)
logger.info("辅种任务执行完成")
def check_recheck(self):
"""
定时检查下载器中种子是否校验完成校验完成且完整的自动开始辅种
"""
if not self._recheck_torrents:
return
if self._is_recheck_running:
return
self._is_recheck_running = True
for downloader in self._downloaders:
# 需要检查的种子
recheck_torrents = self._recheck_torrents.get(downloader) or []
if not recheck_torrents:
continue
logger.info(f"开始检查下载器 {downloader} 的校验任务 ...")
# 下载器类型
# TODO 获取下载器中的种子
torrents = []
if torrents:
can_seeding_torrents = []
for torrent in torrents:
# 获取种子hash
hash_str = self.__get_hash(torrent, downloader)
if self.__can_seeding(torrent, downloader):
can_seeding_torrents.append(hash_str)
if can_seeding_torrents:
logger.info(f"{len(can_seeding_torrents)} 个任务校验完成,开始辅种 ...")
# TODO 开始任务
# 去除已经处理过的种子
self._recheck_torrents[downloader] = list(
set(recheck_torrents).difference(set(can_seeding_torrents)))
elif torrents is None:
logger.info(f"下载器 {downloader} 查询校验任务失败,将在下次继续查询 ...")
continue
else:
logger.info(f"下载器 {downloader} 中没有需要检查的校验任务,清空待处理列表 ...")
self._recheck_torrents[downloader] = []
self._is_recheck_running = False
def __seed_torrents(self, hash_strs: list, downloader: str):
"""
执行一批种子的辅种
"""
if not hash_strs:
return
logger.info(f"下载器 {downloader} 开始查询辅种,数量:{len(hash_strs)} ...")
# 下载器中的Hashs
hashs = [item.get("hash") for item in hash_strs]
# 每个Hash的保存目录
save_paths = {}
for item in hash_strs:
save_paths[item.get("hash")] = item.get("save_path")
# 查询可辅种数据
seed_list, msg = self.iyuuhelper.get_seed_info(hashs)
if not isinstance(seed_list, dict):
logger.warn(f"当前种子列表没有可辅种的站点:{msg}")
return
else:
logger.info(f"IYUU返回可辅种数{len(seed_list)}")
# 遍历
for current_hash, seed_info in seed_list.items():
if not seed_info:
continue
seed_torrents = seed_info.get("torrent")
if not isinstance(seed_torrents, list):
seed_torrents = [seed_torrents]
# 本次辅种成功的种子
success_torrents = []
for seed in seed_torrents:
if not seed:
continue
if not isinstance(seed, dict):
continue
if not seed.get("sid") or not seed.get("info_hash"):
continue
if seed.get("info_hash") in hashs:
logger.info(f"{seed.get('info_hash')} 已在下载器中,跳过 ...")
continue
if seed.get("info_hash") in self._success_caches:
logger.info(f"{seed.get('info_hash')} 已处理过辅种,跳过 ...")
continue
if seed.get("info_hash") in self._error_caches or seed.get("info_hash") in self._permanent_error_caches:
logger.info(f"种子 {seed.get('info_hash')} 辅种失败且已缓存,跳过 ...")
continue
# 添加任务
success = self.__download_torrent(seed=seed,
downloader=downloader,
save_path=save_paths.get(current_hash))
if success:
success_torrents.append(seed.get("info_hash"))
# 辅种成功的去重放入历史
if len(success_torrents) > 0:
self.__save_history(current_hash=current_hash,
downloader=downloader,
success_torrents=success_torrents)
logger.info(f"下载器 {downloader} 辅种完成")
def __save_history(self, current_hash: str, downloader: str, success_torrents: []):
"""
[
{
"downloader":"2",
"torrents":[
"248103a801762a66c201f39df7ea325f8eda521b",
"bd13835c16a5865b01490962a90b3ec48889c1f0"
]
},
{
"downloader":"3",
"torrents":[
"248103a801762a66c201f39df7ea325f8eda521b",
"bd13835c16a5865b01490962a90b3ec48889c1f0"
]
}
]
"""
try:
# 查询当前Hash的辅种历史
seed_history = self.get_data(key=current_hash) or []
new_history = True
if len(seed_history) > 0:
for history in seed_history:
if not history:
continue
if not isinstance(history, dict):
continue
if not history.get("downloader"):
continue
# 如果本次辅种下载器之前有过记录则继续添加
if int(history.get("downloader")) == downloader:
history_torrents = history.get("torrents") or []
history["torrents"] = list(set(history_torrents + success_torrents))
new_history = False
break
# 本次辅种下载器之前没有成功记录则新增
if new_history:
seed_history.append({
"downloader": downloader,
"torrents": list(set(success_torrents))
})
# 保存历史
self.save_data(key=current_hash,
value=seed_history)
except Exception as e:
print(str(e))
def __download_torrent(self, seed: dict, downloader: str, save_path: str):
"""
下载种子
torrent: {
"sid": 3,
"torrent_id": 377467,
"info_hash": "a444850638e7a6f6220e2efdde94099c53358159"
}
"""
self.total += 1
# 获取种子站点及下载地址模板
site_url, download_page = self.iyuuhelper.get_torrent_url(seed.get("sid"))
if not site_url or not download_page:
# 加入缓存
self._error_caches.append(seed.get("info_hash"))
self.fail += 1
self.cached += 1
return False
# 查询站点
site_info = self.sites.get_sites(siteurl=site_url)
if not site_info:
logger.debug(f"没有维护种子对应的站点:{site_url}")
return False
if self._sites and str(site_info.get("id")) not in self._sites:
logger.info("当前站点不在选择的辅助站点范围,跳过 ...")
return False
self.realtotal += 1
# TODO 查询hash值是否已经在下载器中
torrent_info = []
if torrent_info:
logger.debug(f"{seed.get('info_hash')} 已在下载器中,跳过 ...")
self.exist += 1
return False
# 站点流控
if self.sites.check_ratelimit(site_info.get("id")):
self.fail += 1
return False
# 下载种子
torrent_url = self.__get_download_url(seed=seed,
site=site_info,
base_url=download_page)
if not torrent_url:
# 加入失败缓存
self._error_caches.append(seed.get("info_hash"))
self.fail += 1
self.cached += 1
return False
# 强制使用Https
if "?" in torrent_url:
torrent_url += "&https=1"
else:
torrent_url += "?https=1"
# TODO 添加下载,辅种任务默认暂停
download_id, retmsg = None, ""
if not download_id:
# 下载失败
logger.warn(f"添加下载任务出错,"
f"错误原因:{retmsg or '下载器添加任务失败'}"
f"种子链接:{torrent_url}")
self.fail += 1
# 加入失败缓存
if retmsg and ('无法打开链接' in retmsg or '触发站点流控' in retmsg):
self._error_caches.append(seed.get("info_hash"))
else:
# 种子不存在的情况
self._permanent_error_caches.append(seed.get("info_hash"))
return False
else:
self.success += 1
# 追加校验任务
logger.info(f"添加校验检查任务:{download_id} ...")
if not self._recheck_torrents.get(downloader):
self._recheck_torrents[downloader] = []
self._recheck_torrents[downloader].append(download_id)
# 下载成功
logger.info(f"成功添加辅种下载,站点:{site_info.get('name')},种子链接:{torrent_url}")
# TR会自动校验
if downloader == "qbittorrent":
# TODO 开始校验种子
pass
# 成功也加入缓存,有一些改了路径校验不通过的,手动删除后,下一次又会辅上
self._success_caches.append(seed.get("info_hash"))
return True
@staticmethod
def __get_hash(torrent: Any, dl_type: str):
"""
获取种子hash
"""
try:
return torrent.get("hash") if dl_type == "qbittorrent" else torrent.hashString
except Exception as e:
print(str(e))
return ""
@staticmethod
def __get_label(torrent: Any, dl_type: str):
"""
获取种子标签
"""
try:
return torrent.get("tags") or [] if dl_type == "qbittorrent" else torrent.labels or []
except Exception as e:
print(str(e))
return []
@staticmethod
def __can_seeding(torrent: Any, dl_type: str):
"""
判断种子是否可以做种并处于暂停状态
"""
try:
return torrent.get("state") == "pausedUP" if dl_type == "qbittorrent" \
else (torrent.status.stopped and torrent.percent_done == 1)
except Exception as e:
print(str(e))
return False
@staticmethod
def __get_save_path(torrent: Any, dl_type: str):
"""
获取种子保存路径
"""
try:
return torrent.get("save_path") if dl_type == "qbittorrent" else torrent.download_dir
except Exception as e:
print(str(e))
return ""
def __get_download_url(self, seed: dict, site: dict, base_url: str):
"""
拼装种子下载链接
"""
def __is_special_site(url):
"""
判断是否为特殊站点
"""
spec_params = ["hash=", "authkey="]
if any(field in base_url for field in spec_params):
return True
if "hdchina.org" in url:
return True
if "hdsky.me" in url:
return True
if "hdcity.in" in url:
return True
if "totheglory.im" in url:
return True
return False
try:
if __is_special_site(site.get('strict_url')):
# 从详情页面获取下载链接
return self.__get_torrent_url_from_page(seed=seed, site=site)
else:
download_url = base_url.replace(
"id={}",
"id={id}"
).replace(
"/{}",
"/{id}"
).replace(
"/{torrent_key}",
""
).format(
**{
"id": seed.get("torrent_id"),
"passkey": site.get("passkey") or '',
"uid": site.get("uid") or '',
}
)
if download_url.count("{"):
logger.warn(f"当前不支持该站点的辅助任务Url转换失败{seed}")
return None
download_url = re.sub(r"[&?]passkey=", "",
re.sub(r"[&?]uid=", "",
download_url,
flags=re.IGNORECASE),
flags=re.IGNORECASE)
return f"{site.get('strict_url')}/{download_url}"
except Exception as e:
logger.warn(f"站点 {site.get('name')} Url转换失败{str(e)},尝试通过详情页面获取种子下载链接 ...")
return self.__get_torrent_url_from_page(seed=seed, site=site)
def __get_torrent_url_from_page(self, seed: dict, site: dict):
"""
从详情页面获取下载链接
"""
try:
page_url = f"{site.get('strict_url')}/details.php?id={seed.get('torrent_id')}&hit=1"
logger.info(f"正在获取种子下载链接:{page_url} ...")
res = RequestUtils(
cookies=site.get("cookie"),
headers=site.get("ua"),
proxies=settings.PROXY if site.get("proxy") else None
).get_res(url=page_url)
if res is not None and res.status_code in (200, 500):
if "charset=utf-8" in res.text or "charset=UTF-8" in res.text:
res.encoding = "UTF-8"
else:
res.encoding = res.apparent_encoding
if not res.text:
logger.warn(f"获取种子下载链接失败,页面内容为空:{page_url}")
return None
# 使用xpath从页面中获取下载链接
html = etree.HTML(res.text)
for xpath in self._torrent_xpaths:
download_url = html.xpath(xpath)
if download_url:
download_url = download_url[0]
logger.info(f"获取种子下载链接成功:{download_url}")
if not download_url.startswith("http"):
if download_url.startswith("/"):
download_url = f"{site.get('strict_url')}{download_url}"
else:
download_url = f"{site.get('strict_url')}/{download_url}"
return download_url
logger.warn(f"获取种子下载链接失败,未找到下载链接:{page_url}")
return None
else:
logger.error(f"获取种子下载链接失败,请求失败:{page_url}{res.status_code if res else ''}")
return None
except Exception as e:
logger.warn(f"获取种子下载链接失败:{str(e)}")
return None
def stop_service(self):
"""
退出插件
"""
try:
if self._scheduler:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._event.set()
self._scheduler.shutdown()
self._event.clear()
self._scheduler = None
except Exception as e:
print(str(e))

View File

@ -0,0 +1,166 @@
import hashlib
import json
import time
from app.utils.http import RequestUtils
class IyuuHelper(object):
_version = "2.0.0"
_api_base = "https://api.iyuu.cn/%s"
_sites = {}
_token = None
def __init__(self, token):
self._token = token
if self._token:
self.init_config()
def init_config(self):
pass
def __request_iyuu(self, url, method="get", params=None):
"""
向IYUUApi发送请求
"""
if params:
if not params.get("sign"):
params.update({"sign": self._token})
if not params.get("version"):
params.update({"version": self._version})
else:
params = {"sign": self._token, "version": self._version}
# 开始请求
if method == "get":
ret = RequestUtils(
accept_type="application/json"
).get_res(f"{url}", params=params)
else:
ret = RequestUtils(
accept_type="application/json"
).post_res(f"{url}", data=params)
if ret:
result = ret.json()
if result.get('ret') == 200:
return result.get('data'), ""
else:
return None, f"请求IYUU失败状态码{result.get('ret')},返回信息:{result.get('msg')}"
elif ret is not None:
return None, f"请求IYUU失败状态码{ret.status_code},错误原因:{ret.reason}"
else:
return None, f"请求IYUU失败未获取到返回信息"
def get_torrent_url(self, sid):
if not sid:
return None, None
if not self._sites:
self._sites = self.__get_sites()
if not self._sites.get(sid):
return None, None
site = self._sites.get(sid)
return site.get('base_url'), site.get('download_page')
def __get_sites(self):
"""
返回支持辅种的全部站点
:return: 站点列表错误信息
{
"ret": 200,
"data": {
"sites": [
{
"id": 1,
"site": "keepfrds",
"nickname": "朋友",
"base_url": "pt.keepfrds.com",
"download_page": "download.php?id={}&passkey={passkey}",
"reseed_check": "passkey",
"is_https": 2
},
]
}
}
"""
result, msg = self.__request_iyuu(url=self._api_base % 'App.Api.Sites')
if result:
ret_sites = {}
sites = result.get('sites') or []
for site in sites:
ret_sites[site.get('id')] = site
return ret_sites
else:
print(msg)
return {}
def get_seed_info(self, info_hashs: list):
"""
返回info_hash对应的站点id种子id
{
"ret": 200,
"data": [
{
"sid": 3,
"torrent_id": 377467,
"info_hash": "a444850638e7a6f6220e2efdde94099c53358159"
},
{
"sid": 7,
"torrent_id": 35538,
"info_hash": "cf7d88fd656d10fe5130d13567aec27068b96676"
}
],
"msg": "",
"version": "1.0.0"
}
"""
info_hashs.sort()
json_data = json.dumps(info_hashs, separators=(',', ':'), ensure_ascii=False)
sha1 = self.get_sha1(json_data)
result, msg = self.__request_iyuu(url=self._api_base % 'App.Api.Infohash',
method="post",
params={
"timestamp": time.time(),
"hash": json_data,
"sha1": sha1
})
return result, msg
@staticmethod
def get_sha1(json_str) -> str:
return hashlib.sha1(json_str.encode('utf-8')).hexdigest()
def get_auth_sites(self):
"""
返回支持鉴权的站点列表
[
{
"id": 2,
"site": "pthome",
"bind_check": "passkey,uid"
}
]
"""
result, msg = self.__request_iyuu(url=self._api_base % 'App.Api.GetRecommendSites')
if result:
return result.get('recommend') or []
else:
print(msg)
return []
def bind_site(self, site, passkey, uid):
"""
绑定站点
:param site: 站点名称
:param passkey: passkey
:param uid: 用户id
:return: 状态码错误信息
"""
result, msg = self.__request_iyuu(url=self._api_base % 'App.Api.Bind',
method="get",
params={
"token": self._token,
"site": site,
"passkey": self.get_sha1(passkey),
"id": uid
})
return result, msg

View File

@ -0,0 +1,514 @@
import os
from datetime import datetime, timedelta
from pathlib import Path
from threading import Event
from typing import Any, List, Dict, Tuple
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from app.core.config import settings
from app.log import logger
from app.plugins import _PluginBase
class TorrentTransfer(_PluginBase):
# 插件名称
plugin_name = "自动转移做种"
# 插件描述
plugin_desc = "定期转移下载器中的做种任务到另一个下载器。"
# 插件图标
plugin_icon = "torrenttransfer.jpg"
# 主题色
plugin_color = "#272636"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "jxxghp"
# 作者主页
author_url = "https://github.com/jxxghp"
# 插件配置项ID前缀
plugin_config_prefix = "torrenttransfer_"
# 加载顺序
plugin_order = 18
# 可使用的用户级别
auth_level = 2
# 私有属性
_scheduler = None
sites = None
# 开关
_enabled = False
_cron = None
_onlyonce = False
_fromdownloader = None
_todownloader = None
_frompath = None
_topath = None
_notify = False
_nolabels = None
_nopaths = None
_deletesource = False
_fromtorrentpath = None
_autostart = False
# 退出事件
_event = Event()
# 待检查种子清单
_recheck_torrents = {}
_is_recheck_running = False
# 任务标签
_torrent_tags = ["已整理", "转移做种"]
def init_plugin(self, config: dict = None):
# 读取配置
if config:
self._enabled = config.get("enabled")
self._onlyonce = config.get("onlyonce")
self._cron = config.get("cron")
self._notify = config.get("notify")
self._nolabels = config.get("nolabels")
self._frompath = config.get("frompath")
self._topath = config.get("topath")
self._fromdownloader = config.get("fromdownloader")
self._todownloader = config.get("todownloader")
self._deletesource = config.get("deletesource")
self._fromtorrentpath = config.get("fromtorrentpath")
self._nopaths = config.get("nopaths")
self._autostart = config.get("autostart")
# 停止现有任务
self.stop_service()
# 启动定时任务 & 立即运行一次
if self.get_state() or self._onlyonce:
# 检查配置
if self._fromtorrentpath and not Path(self._fromtorrentpath).exists():
logger.error(f"源下载器种子文件保存路径不存在:{self._fromtorrentpath}")
return
if self._fromdownloader == self._todownloader:
logger.error(f"源下载器和目的下载器不能相同")
self.systemmessage(f"源下载器和目的下载器不能相同")
return
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
if self._cron:
logger.info(f"移转做种服务启动,周期:{self._cron}")
try:
self._scheduler.add_job(self.transfer,
CronTrigger.from_crontab(self._cron))
except Exception as e:
logger.error(f"移转做种服务启动失败:{e}")
self.systemmessage(f"移转做种服务启动失败:{e}")
return
if self._onlyonce:
logger.info(f"移转做种服务启动,立即运行一次")
self._scheduler.add_job(self.transfer, 'date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(
seconds=3))
# 关闭一次性开关
self._onlyonce = False
self.update_config({
"enable": self._enabled,
"onlyonce": self._onlyonce,
"cron": self._cron,
"notify": self._notify,
"nolabels": self._nolabels,
"frompath": self._frompath,
"topath": self._topath,
"fromdownloader": self._fromdownloader,
"todownloader": self._todownloader,
"deletesource": self._deletesource,
"fromtorrentpath": self._fromtorrentpath,
"nopaths": self._nopaths,
"autostart": self._autostart
})
if self._scheduler.get_jobs():
if self._autostart:
# 追加种子校验服务
self._scheduler.add_job(self.check_recheck, 'interval', minutes=3)
# 启动服务
self._scheduler.print_jobs()
self._scheduler.start()
def get_state(self):
return True if self._enabled \
and self._cron \
and self._fromdownloader \
and self._todownloader \
and self._fromtorrentpath else False
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1页面配置2数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'notify',
'label': '发送通知',
}
}
]
}
]
}
]
}
], {
"enable": False,
"notify": False,
"onlyonce": False,
"cron": "",
"nolabels": "",
"frompath": "",
"topath": "",
"fromdownloader": "",
"todownloader": "",
"deletesource": False,
"fromtorrentpath": "",
"nopaths": "",
"autostart": True
}
def get_page(self) -> List[dict]:
pass
def transfer(self):
"""
开始移转做种
"""
if not self._enabled \
or not self._fromdownloader \
or not self._todownloader \
or not self._fromtorrentpath:
logger.warn("移转做种服务未启用或未配置")
return
logger.info("开始移转做种任务 ...")
# 源下载器
downloader = self._fromdownloader
# 目的下载器
todownloader = self._todownloader
# TODO 获取下载器中已完成的种子
torrents = []
if torrents:
logger.info(f"下载器 {downloader} 已完成种子数:{len(torrents)}")
else:
logger.info(f"下载器 {downloader} 没有已完成种子")
return
# 过滤种子,记录保存目录
hash_strs = []
for torrent in torrents:
if self._event.is_set():
logger.info(f"移转服务停止")
return
# 获取种子hash
hash_str = self.__get_hash(torrent, downloader)
# 获取保存路径
save_path = self.__get_save_path(torrent, downloader)
if self._nopaths and save_path:
# 过滤不需要移转的路径
nopath_skip = False
for nopath in self._nopaths.split('\n'):
if os.path.normpath(save_path).startswith(os.path.normpath(nopath)):
logger.info(f"种子 {hash_str} 保存路径 {save_path} 不需要移转,跳过 ...")
nopath_skip = True
break
if nopath_skip:
continue
# 获取种子标签
torrent_labels = self.__get_label(torrent, downloader)
if torrent_labels and self._nolabels:
is_skip = False
for label in self._nolabels.split(','):
if label in torrent_labels:
logger.info(f"种子 {hash_str} 含有不转移标签 {label},跳过 ...")
is_skip = True
break
if is_skip:
continue
hash_strs.append({
"hash": hash_str,
"save_path": save_path
})
# 开始转移任务
if hash_strs:
logger.info(f"需要移转的种子数:{len(hash_strs)}")
# 记数
total = len(hash_strs)
success = 0
fail = 0
for hash_item in hash_strs:
# 检查种子文件是否存在
torrent_file = os.path.join(self._fromtorrentpath,
f"{hash_item.get('hash')}.torrent")
if not os.path.exists(torrent_file):
logger.error(f"种子文件不存在:{torrent_file}")
fail += 1
continue
# TODO 查询hash值是否已经在目的下载器中
torrent_info = []
if torrent_info:
logger.debug(f"{hash_item.get('hash')} 已在目的下载器中,跳过 ...")
continue
# 转换保存路径
download_dir = self.__convert_save_path(hash_item.get('save_path'),
self._frompath,
self._topath)
if not download_dir:
logger.error(f"转换保存路径失败:{hash_item.get('save_path')}")
fail += 1
continue
# 如果是QB检查是否有Tracker没有的话补充解析
if downloader == "qbittorrent":
# TODO 读取种子内容、解析种子文件
content, retmsg = None, ""
if not content:
logger.error(f"读取种子文件失败:{retmsg}")
fail += 1
continue
# TODO 读取trackers
try:
torrent_main = None
main_announce = None
except Exception as err:
logger.error(f"解析种子文件 {torrent_file} 失败:{err}")
fail += 1
continue
if not main_announce:
logger.info(f"{hash_item.get('hash')} 未发现tracker信息尝试补充tracker信息...")
# 读取fastresume文件
fastresume_file = os.path.join(self._fromtorrentpath,
f"{hash_item.get('hash')}.fastresume")
if not os.path.exists(fastresume_file):
logger.error(f"fastresume文件不存在{fastresume_file}")
fail += 1
continue
# 尝试补充trackers
try:
with open(fastresume_file, 'rb') as f:
fastresume = f.read()
# TODO 解析fastresume文件
torrent_fastresume = None
# TODO 读取trackers
fastresume_trackers = None
if isinstance(fastresume_trackers, list) \
and len(fastresume_trackers) > 0 \
and fastresume_trackers[0]:
# 重新赋值
torrent_main['announce'] = fastresume_trackers[0][0]
# 替换种子文件路径
torrent_file = settings.TEMP_PATH / f"{hash_item.get('hash')}.torrent"
# TODO 编码并保存到临时文件
with open(torrent_file, 'wb') as f:
pass
except Exception as err:
logger.error(f"解析fastresume文件 {fastresume_file} 失败:{err}")
fail += 1
continue
# TODO 发送到另一个下载器中下载:默认暂停、传输下载路径、关闭自动管理模式
download_id, retmsg = None, ""
if not download_id:
# 下载失败
logger.warn(f"添加转移任务出错,"
f"错误原因:{retmsg or '下载器添加任务失败'}"
f"种子文件:{torrent_file}")
fail += 1
continue
else:
# 追加校验任务
logger.info(f"添加校验检查任务:{download_id} ...")
if not self._recheck_torrents.get(todownloader):
self._recheck_torrents[todownloader] = []
self._recheck_torrents[todownloader].append(download_id)
# 下载成功
logger.info(f"成功添加转移做种任务,种子文件:{torrent_file}")
# TR会自动校验
if downloader == "qbittorrent":
# TODO 开始校验种子
pass
# TODO 删除源种子,不能删除文件!
if self._deletesource:
pass
success += 1
# 插入转种记录
history_key = "%s-%s" % (int(self._fromdownloader[0]), hash_item.get('hash'))
self.save_data(key=history_key,
value={
"to_download": int(self._todownloader[0]),
"to_download_id": download_id,
"delete_source": self._deletesource,
})
# 触发校验任务
if success > 0 and self._autostart:
self.check_recheck()
# 发送通知
if self._notify:
self.post_message(
title="【移转做种任务执行完成】",
text=f"总数:{total},成功:{success},失败:{fail}"
)
else:
logger.info(f"没有需要移转的种子")
logger.info("移转做种任务执行完成")
def check_recheck(self):
"""
定时检查下载器中种子是否校验完成校验完成且完整的自动开始辅种
"""
if not self._recheck_torrents:
return
if not self._todownloader:
return
if self._is_recheck_running:
return
downloader = self._todownloader
# 需要检查的种子
recheck_torrents = self._recheck_torrents.get(downloader, [])
if not recheck_torrents:
return
logger.info(f"开始检查下载器 {downloader} 的校验任务 ...")
self._is_recheck_running = True
# TODO 获取下载器中的种子
torrents = []
if torrents:
can_seeding_torrents = []
for torrent in torrents:
# 获取种子hash
hash_str = self.__get_hash(torrent, downloader)
if self.__can_seeding(torrent, downloader):
can_seeding_torrents.append(hash_str)
if can_seeding_torrents:
logger.info(f"{len(can_seeding_torrents)} 个任务校验完成,开始辅种 ...")
# TODO 开始辅种
# 去除已经处理过的种子
self._recheck_torrents[downloader] = list(
set(recheck_torrents).difference(set(can_seeding_torrents)))
elif torrents is None:
logger.info(f"下载器 {downloader} 查询校验任务失败,将在下次继续查询 ...")
else:
logger.info(f"下载器 {downloader} 中没有需要检查的校验任务,清空待处理列表 ...")
self._recheck_torrents[downloader] = []
self._is_recheck_running = False
@staticmethod
def __get_hash(torrent: Any, dl_type: str):
"""
获取种子hash
"""
try:
return torrent.get("hash") if dl_type == "qbittorrent" else torrent.hashString
except Exception as e:
print(str(e))
return ""
@staticmethod
def __get_label(torrent: Any, dl_type: str):
"""
获取种子标签
"""
try:
return torrent.get("tags") or [] if dl_type == "qbittorrent" else torrent.labels or []
except Exception as e:
print(str(e))
return []
@staticmethod
def __get_save_path(torrent: Any, dl_type: str):
"""
获取种子保存路径
"""
try:
return torrent.get("save_path") if dl_type == "qbittorrent" else torrent.download_dir
except Exception as e:
print(str(e))
return ""
@staticmethod
def __can_seeding(torrent: Any, dl_type: str):
"""
判断种子是否可以做种并处于暂停状态
"""
try:
return torrent.get("state") == "pausedUP" and torrent.get("tracker") if dl_type == "qbittorrent" \
else (torrent.status.stopped and torrent.percent_done == 1 and torrent.trackers)
except Exception as e:
print(str(e))
return False
@staticmethod
def __convert_save_path(save_path: str, from_root: str, to_root: str):
"""
转换保存路径
"""
try:
# 没有保存目录,以目的根目录为准
if not save_path:
return to_root
# 没有设置根目录时返回save_path
if not to_root or not from_root:
return save_path
# 统一目录格式
save_path = os.path.normpath(save_path).replace("\\", "/")
from_root = os.path.normpath(from_root).replace("\\", "/")
to_root = os.path.normpath(to_root).replace("\\", "/")
# 替换根目录
if save_path.startswith(from_root):
return save_path.replace(from_root, to_root, 1)
except Exception as e:
print(str(e))
return None
def stop_service(self):
"""
退出插件
"""
try:
if self._scheduler:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._event.set()
self._scheduler.shutdown()
self._event.clear()
self._scheduler = None
except Exception as e:
print(str(e))