Merge pull request #345 from thsrite/main

feat 下载器种子同步插件 && fix 同步删除插件
This commit is contained in:
jxxghp 2023-08-31 15:11:58 +08:00 committed by GitHub
commit 1fcdf633ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 591 additions and 203 deletions

View File

@ -39,12 +39,13 @@ class DownloadHistoryOper(DbOper):
downloadfile = DownloadFiles(**file_item)
downloadfile.create(self._db)
def get_files_by_hash(self, download_hash: str) -> List[DownloadFiles]:
def get_files_by_hash(self, download_hash: str, state: int = None) -> List[DownloadFiles]:
"""
按Hash查询下载文件记录
:param download_hash: 数据key
:param state: 删除状态
"""
return DownloadFiles.get_by_hash(self._db, download_hash)
return DownloadFiles.get_by_hash(self._db, download_hash, state)
def get_file_by_fullpath(self, fullpath: str) -> DownloadFiles:
"""
@ -60,6 +61,13 @@ class DownloadHistoryOper(DbOper):
"""
return DownloadFiles.get_by_savepath(self._db, fullpath)
def delete_file_by_fullpath(self, fullpath: str):
"""
按fullpath删除下载文件记录
:param fullpath: 数据key
"""
DownloadFiles.delete_file_by_fullpath(self._db, fullpath)
def list_by_page(self, page: int = 1, count: int = 30) -> List[DownloadHistory]:
"""
分页查询下载历史

View File

@ -112,13 +112,27 @@ class DownloadFiles(Base):
state = Column(Integer, nullable=False, default=1)
@staticmethod
def get_by_hash(db: Session, download_hash: str):
def get_by_hash(db: Session, download_hash: str, state: int = None):
if state:
return db.query(DownloadFiles).filter(DownloadFiles.download_hash == download_hash,
DownloadFiles.state == state).all()
else:
return db.query(DownloadFiles).filter(DownloadFiles.download_hash == download_hash).all()
@staticmethod
def get_by_fullpath(db: Session, fullpath: str):
return db.query(DownloadFiles).filter(DownloadFiles.fullpath == fullpath).first()
return db.query(DownloadFiles).filter(DownloadFiles.fullpath == fullpath).order_by(
DownloadHistory.id.desc()).first()
@staticmethod
def get_by_savepath(db: Session, savepath: str):
return db.query(DownloadFiles).filter(DownloadFiles.savepath == savepath).all()
@staticmethod
def delete_by_fullpath(db: Session, fullpath: str):
return db.query(DownloadFiles).filter(DownloadFiles.fullpath == fullpath,
DownloadFiles.state == 1).update(
{
"state": 0
}
)

View File

@ -114,3 +114,11 @@ class TransferHistory(Base):
TransferHistory.year == year,
TransferHistory.seasons == season,
TransferHistory.episodes == episode).all()
@staticmethod
def update_download_hash(db: Session, historyid: int = None, download_hash: str = None):
db.query(TransferHistory).filter(TransferHistory.id == historyid).update(
{
"download_hash": download_hash
}
)

View File

@ -79,3 +79,9 @@ class TransferHistoryOper(DbOper):
if transferhistory:
transferhistory.delete(self._db, transferhistory.id)
return TransferHistory(**kwargs).create(self._db)
def update_download_hash(self, historyid, download_hash):
"""
补充转移记录download_hash
"""
TransferHistory.update_download_hash(self._db, historyid, download_hash)

View File

@ -114,7 +114,8 @@ class AutoSignIn(_PluginBase):
self.__update_config()
# 周期运行
if self._enabled and self._cron:
if self._enabled:
if self._cron:
try:
if self._cron.strip().count(" ") == 4:
self._scheduler.add_job(func=self.sign_in,

View File

@ -12,6 +12,7 @@ from apscheduler.triggers.cron import CronTrigger
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.models.transferhistory import TransferHistory
from app.db.transferhistory_oper import TransferHistoryOper
from app.log import logger
@ -57,11 +58,13 @@ class MediaSyncDel(_PluginBase):
_del_source = False
_exclude_path = None
_transferhis = None
_downloadhis = None
qb = None
tr = None
def init_plugin(self, config: dict = None):
self._transferhis = TransferHistoryOper(self.db)
self._downloadhis = DownloadHistoryOper(self.db)
self.episode = Episode()
self.qb = Qbittorrent()
self.tr = Transmission()
@ -518,13 +521,16 @@ class MediaSyncDel(_PluginBase):
year = None
del_cnt = 0
stop_cnt = 0
error_cnt = 0
for transferhis in transfer_history:
image = transferhis.image
year = transferhis.year
# 删除种子任务
if self._del_source:
# 0、删除转移记录
self._transferhis.delete(transferhis.id)
# 删除种子任务
if self._del_source:
# 1、直接删除源文件
if transferhis.src and Path(transferhis.src).suffix in settings.RMT_MEDIAEXT:
source_name = os.path.basename(transferhis.src)
@ -534,11 +540,14 @@ class MediaSyncDel(_PluginBase):
if transferhis.download_hash:
try:
# 2、判断种子是否被删除完
delete_flag, stop_flag = self.handle_torrent(src=source_path,
delete_flag, success_flag = self.handle_torrent(src=transferhis.src,
torrent_hash=transferhis.download_hash)
if not success_flag:
error_cnt += 1
else:
if delete_flag:
del_cnt += 1
if stop_flag:
else:
stop_cnt += 1
except Exception as e:
logger.error("删除种子失败,尝试删除源文件:%s" % str(e))
@ -560,7 +569,9 @@ class MediaSyncDel(_PluginBase):
title="媒体库同步删除任务完成",
image=image,
text=f"{msg}\n"
f"数量 删除{del_cnt}个 暂停{stop_cnt}\n"
f"删除{del_cnt}\n"
f"暂停{stop_cnt}\n"
f"错误{error_cnt}\n"
f"时间 {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}"
)
@ -668,13 +679,14 @@ class MediaSyncDel(_PluginBase):
image = 'https://emby.media/notificationicon.png'
del_cnt = 0
stop_cnt = 0
error_cnt = 0
for transferhis in transfer_history:
image = transferhis.image
self._transferhis.delete(transferhis.id)
# 删除种子任务
if self._del_source:
# 0、删除转移记录
self._transferhis.delete(transferhis.id)
# 删除种子任务
if self._del_source:
# 1、直接删除源文件
if transferhis.src and Path(transferhis.src).suffix in settings.RMT_MEDIAEXT:
source_name = os.path.basename(transferhis.src)
@ -684,11 +696,14 @@ class MediaSyncDel(_PluginBase):
if transferhis.download_hash:
try:
# 2、判断种子是否被删除完
delete_flag, stop_flag = self.handle_torrent(src=source_path,
delete_flag, success_flag = self.handle_torrent(src=transferhis.src,
torrent_hash=transferhis.download_hash)
if not success_flag:
error_cnt += 1
else:
if delete_flag:
del_cnt += 1
if stop_flag:
else:
stop_cnt += 1
except Exception as e:
logger.error("删除种子失败,尝试删除源文件:%s" % str(e))
@ -701,7 +716,9 @@ class MediaSyncDel(_PluginBase):
mtype=NotificationType.MediaServer,
title="媒体库同步删除任务完成",
text=f"{msg}\n"
f"数量 删除{del_cnt}个 暂停{stop_cnt}\n"
f"删除{del_cnt}\n"
f"暂停{stop_cnt}\n"
f"错误{error_cnt}\n"
f"时间 {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}",
image=image)
@ -735,124 +752,77 @@ class MediaSyncDel(_PluginBase):
plugin_id=plugin_id)
logger.info(f"查询到 {history_key} 转种历史 {transfer_history}")
# 删除历史标志
del_history = False
# 删除种子标志
try:
# 删除本次种子记录
self._downloadhis.delete_file_by_fullpath(fullpath=src)
# 根据种子hash查询剩余未删除的记录
downloadHisNoDel = self._downloadhis.get_files_by_hash(download_hash=torrent_hash, state=1)
if downloadHisNoDel and len(downloadHisNoDel) > 0:
logger.info(
f"查询种子任务 {torrent_hash} 存在 {len(downloadHisNoDel)} 个未删除文件,执行暂停种子操作")
delete_flag = False
else:
logger.info(
f"查询种子任务 {torrent_hash} 文件已全部删除,执行删除种子操作")
delete_flag = True
# 是否需要暂停源下载器种子
stop_flag = False
# 如果有转种记录,则删除转种后的下载任务
if transfer_history and isinstance(transfer_history, dict):
download = transfer_history['to_download']
download_id = transfer_history['to_download_id']
delete_source = transfer_history['delete_source']
del_history = True
# 删除种子
if delete_flag:
# 删除转种记录
self.del_data(key=history_key, plugin_id=plugin_id)
# 转种后未删除源种时,同步删除源种
if not delete_source:
logger.info(f"{history_key} 转种时未删除源下载任务,开始删除源下载任务…")
try:
dl_files = self.chain.torrent_files(tid=torrent_hash)
if not dl_files:
logger.info(f"未获取到 {settings.DOWNLOADER} - {torrent_hash} 种子文件,种子已被删除")
else:
for dl_file in dl_files:
dl_file_name = dl_file.get("name")
torrent_file = os.path.join(src, os.path.basename(dl_file_name))
if Path(torrent_file).exists():
logger.warn(f"种子有文件被删除,种子文件{torrent_file}暂未删除,暂停种子")
delete_flag = False
stop_flag = True
break
if delete_flag:
logger.info(f"删除下载任务:{settings.DOWNLOADER} - {torrent_hash}")
# 删除源种子
logger.info(f"删除源下载器下载任务:{settings.DOWNLOADER} - {torrent_hash}")
self.chain.remove_torrents(torrent_hash)
except Exception as e:
logger.error(f"删除源下载任务 {history_key} 失败: {str(e)}")
# 如果是False则说明种子文件没有完全被删除暂停种子暂不处理
if delete_flag:
try:
# 转种download
# 删除转种后任务
logger.info(f"删除转种后下载任务:{download} - {download_id}")
# 删除转种后下载任务
if download == "transmission":
dl_files = self.tr.get_files(tid=download_id)
if not dl_files:
logger.info(f"未获取到 {download} - {download_id} 种子文件,种子已被删除")
else:
for dl_file in dl_files:
dl_file_name = dl_file.name
if not transfer_history or not stop_flag:
torrent_file = os.path.join(src, os.path.basename(dl_file_name))
if Path(torrent_file).exists():
logger.info(f"种子有文件被删除,种子文件{torrent_file}暂未删除,暂停种子")
delete_flag = False
stop_flag = True
break
if delete_flag:
# 删除源下载任务或转种后下载任务
logger.info(f"删除下载任务:{download} - {download_id}")
self.tr.delete_torrents(delete_file=True,
ids=download_id)
# 删除转种记录
if del_history:
self.del_data(key=history_key, plugin_id=plugin_id)
# 处理辅种
self.__del_seed(download=download, download_id=download_id, action_flag="del")
else:
dl_files = self.qb.get_files(tid=download_id)
if not dl_files:
logger.info(f"未获取到 {download} - {download_id} 种子文件,种子已被删除")
else:
for dl_file in dl_files:
dl_file_name = dl_file.get("name")
if not transfer_history or not stop_flag:
torrent_file = os.path.join(src, os.path.basename(dl_file_name))
if Path(torrent_file).exists():
logger.info(f"种子有文件被删除,种子文件{torrent_file}暂未删除,暂停种子")
delete_flag = False
stop_flag = True
break
if delete_flag:
# 删除源下载任务或转种后下载任务
logger.info(f"删除下载任务:{download} - {download_id}")
self.qb.delete_torrents(delete_file=True,
ids=download_id)
else:
# 暂停种子
# 转种后未删除源种时,同步暂停源种
if not delete_source:
logger.info(f"{history_key} 转种时未删除源下载任务,开始暂停源下载任务…")
# 删除转种记录
if del_history:
self.del_data(key=history_key, plugin_id=plugin_id)
# 暂停源种子
logger.info(f"暂停源下载器下载任务:{settings.DOWNLOADER} - {torrent_hash}")
self.chain.stop_torrents(torrent_hash)
else:
# 未转种de情况
if delete_flag:
# 删除源种子
logger.info(f"删除源下载器下载任务:{download} - {download_id}")
self.chain.remove_torrents(download_id)
else:
# 暂停源种子
logger.info(f"暂停源下载器下载任务:{download} - {download_id}")
self.chain.stop_torrents(download_id)
# 处理辅种
self.__del_seed(download=download, download_id=download_id, action_flag="del")
self.__del_seed(download=download, download_id=download_id, action_flag="del" if delete_flag else 'stop')
return delete_flag, True
except Exception as e:
logger.error(f"删除转种辅种下载任务失败: {str(e)}")
# 判断是否暂停
if not delete_flag:
logger.error("开始暂停种子")
# 暂停种子
if stop_flag:
# 暂停源种
self.chain.stop_torrents(torrent_hash)
logger.info(f"种子:{settings.DOWNLOADER} - {torrent_hash} 暂停")
# 暂停转种
if del_history:
if download == "qbittorrent":
self.qb.stop_torrents(download_id)
logger.info(f"转种:{download} - {download_id} 暂停")
else:
self.tr.stop_torrents(download_id)
logger.info(f"转种:{download} - {download_id} 暂停")
# 暂停辅种
self.__del_seed(download=download, download_id=download_id, action_flag="stop")
return delete_flag, stop_flag
logger.error(f"删种失败: {e}")
return False, False
def __del_seed(self, download, download_id, action_flag):
"""

View File

@ -255,7 +255,10 @@ class NAStoolSync(_PluginBase):
# 转种后种子hash
transfer_hash = []
qb_torrents = []
tr_torrents = []
tr_torrents_all = []
if self._supp:
# 获取所有的转种数据
transfer_datas = self._plugindata.get_data_all("TorrentTransfer")
if transfer_datas:

View File

@ -0,0 +1,378 @@
import os
import time
from pathlib import Path
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.transferhistory_oper import TransferHistoryOper
from app.modules.qbittorrent import Qbittorrent
from app.modules.transmission import Transmission
from app.plugins import _PluginBase
from typing import Any, List, Dict, Tuple
from app.log import logger
class SyncDownloadFiles(_PluginBase):
# 插件名称
plugin_name = "SyncDownloadFiles"
# 插件描述
plugin_desc = "同步下载器文件记录。"
# 插件图标
plugin_icon = "sync_file.png"
# 主题色
plugin_color = "bg-blue"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "syncdownloadfiles_"
# 加载顺序
plugin_order = 20
# 可使用的用户级别
auth_level = 2
# 私有属性
qb = None
tr = None
_onlyonce = False
_history = False
_downloaders = []
_dirs = None
downloadhis = None
transferhis = None
def init_plugin(self, config: dict = None):
if config:
self._history = config.get('history')
self._onlyonce = config.get("onlyonce")
self._downloaders = config.get('downloaders') or []
self._dirs = config.get("dirs") or ""
if self._onlyonce:
# 执行一次
self.qb = Qbittorrent()
self.tr = Transmission()
self.downloadhis = DownloadHistoryOper(self.db)
self.transferhis = TransferHistoryOper(self.db)
# 关闭onlyonce
self._onlyonce = False
self.update_config({
"history": self._history,
"onlyonce": self._onlyonce,
"downloaders": self._downloaders,
"dirs": self._dirs
})
self.sync()
def sync(self):
"""
同步所选下载器种子记录
"""
if not self._downloaders:
logger.error("未选择同步下载器,停止运行")
return
# 遍历下载器同步记录
for downloader in self._downloaders:
# 获取最后同步时间
last_sync_time = self.get_data(f"last_sync_time_{downloader}")
logger.info(f"开始扫描下载器 {downloader} ...")
downloader_obj = self.__get_downloader(downloader)
# 获取下载器中已完成的种子
torrents = downloader_obj.get_completed_torrents()
# 排序种子,根据种子添加时间倒序
if downloader == "qbittorrent":
torrents = sorted(torrents, key=lambda x: x.get("added_on"), reverse=True)
else:
torrents = sorted(torrents, key=lambda x: x.added_date, reverse=True)
if torrents:
logger.info(f"下载器 {downloader} 已完成种子数:{len(torrents)}")
else:
logger.info(f"下载器 {downloader} 没有已完成种子")
continue
for torrent in torrents:
# 返回false标识后续种子已被同步
sync_flag = self.__compare_time(torrent, downloader, last_sync_time)
if not sync_flag:
logger.info(f"最后同步时间{last_sync_time}, 之前种子已被同步,结束当前下载器 {downloader} 任务")
break
# 获取种子hash
hash_str = self.__get_hash(torrent, downloader)
# 获取种子download_dir
download_dir = self.__get_download_dir(torrent, downloader)
# 获取种子name
torrent_name = self.__get_torrent_name(torrent, downloader)
# 获取种子文件
torrent_files = self.__get_torrent_files(torrent, downloader, downloader_obj)
logger.info(f"开始同步种子 {hash_str}, 文件数 {len(torrent_files)}")
# 处理路径映射
if self._dirs:
paths = self._dirs.split("\n")
for path in paths:
sub_paths = path.split(":")
download_dir = download_dir.replace(sub_paths[0], sub_paths[1]).replace('\\', '/')
download_files = []
for file in torrent_files:
file_name = self.__get_file_name(file, downloader)
full_path = Path(download_dir).joinpath(torrent_name, file_name)
if self._history:
transferhis = self.transferhis.get_by_src(str(full_path))
if transferhis and not transferhis.download_hash:
logger.info(f"开始补充转移记录 {transferhis.id} download_hash {hash_str}")
self.transferhis.update_download_hash(historyid=transferhis.id,
download_hash=hash_str)
# 种子文件记录
download_files.append(
{
"download_hash": hash_str,
"downloader": downloader,
"fullpath": str(full_path),
"savepath": str(Path(download_dir).joinpath(torrent_name)),
"filepath": file_name,
"torrentname": torrent_name,
}
)
if download_files:
# 登记下载文件
self.downloadhis.add_files(download_files)
logger.info(f"种子 {hash_str} 同步完成")
logger.info(f"下载器种子文件同步完成!")
self.save_data(f"last_sync_time_{downloader}",
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
@staticmethod
def __compare_time(torrent: Any, dl_tpe: str, last_sync_time: str = None):
if last_sync_time:
# 获取种子时间
if dl_tpe == "qbittorrent":
torrent_date = time.gmtime(torrent.get("added_on")) # 将时间戳转换为时间元组
torrent_date = time.strftime("%Y-%m-%d %H:%M:%S", torrent_date) # 格式化时间
else:
torrent_date = torrent.added_date
# 之后的种子已经同步了
if last_sync_time > str(torrent_date):
return False
return True
@staticmethod
def __get_file_name(file: Any, dl_type: str):
"""
获取文件名
"""
try:
return os.path.basename(file.get("name")) if dl_type == "qbittorrent" else os.path.basename(file.name)
except Exception as e:
print(str(e))
return ""
@staticmethod
def __get_torrent_files(torrent: Any, dl_type: str, downloader_obj):
"""
获取种子文件
"""
try:
return torrent.files if dl_type == "qbittorrent" else downloader_obj.get_files(tid=torrent.id)
except Exception as e:
print(str(e))
return ""
@staticmethod
def __get_torrent_name(torrent: Any, dl_type: str):
"""
获取种子name
"""
try:
return torrent.get("name") if dl_type == "qbittorrent" else torrent.name
except Exception as e:
print(str(e))
return ""
@staticmethod
def __get_download_dir(torrent: Any, dl_type: str):
"""
获取种子download_dir
"""
try:
return torrent.get("save_path") if dl_type == "qbittorrent" else torrent.download_dir
except Exception as e:
print(str(e))
return ""
@staticmethod
def __get_hash(torrent: Any, dl_type: str):
"""
获取种子hash
"""
try:
return torrent.get("hash") if dl_type == "qbittorrent" else torrent.hashString
except Exception as e:
print(str(e))
return ""
def __get_downloader(self, dtype: str):
"""
根据类型返回下载器实例
"""
if dtype == "qbittorrent":
return self.qb
elif dtype == "transmission":
return self.tr
else:
return None
def get_state(self) -> bool:
return False
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1页面配置2数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'onlyonce',
'label': '运行一次',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'history',
'label': '补充转移记录',
}
}
]
},
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12
},
'content': [
{
'component': 'VSelect',
'props': {
'chips': True,
'multiple': True,
'model': 'downloaders',
'label': '同步下载器',
'items': [
{'title': 'Qbittorrent', 'value': 'qbittorrent'},
{'title': 'Transmission', 'value': 'transmission'}
]
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'dirs',
'label': '目录映射',
'rows': 5,
'placeholder': '每一行一个目录,下载器地址:mp地址'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'text': '如果所选下载器种子很多的话,时间会有点久,请耐心等候,可查看日志。'
}
}
]
}
]
}
]
}
], {
"onlyonce": False,
"history": False,
"dirs": "",
"downloaders": []
}
def get_page(self) -> List[dict]:
pass
def stop_service(self):
"""
退出插件
"""
pass