from pathlib import Path from typing import List, Any from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from app import schemas from app.chain.transfer import TransferChain from app.core.security import verify_token from app.db import get_db from app.db.models.downloadhistory import DownloadHistory from app.db.models.transferhistory import TransferHistory from app.schemas import MediaType router = APIRouter() @router.get("/download", summary="查询下载历史记录", response_model=List[schemas.DownloadHistory]) def download_history(page: int = 1, count: int = 30, db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 查询下载历史记录 """ return DownloadHistory.list_by_page(db, page, count) @router.delete("/download", summary="删除下载历史记录", response_model=schemas.Response) def delete_download_history(history_in: schemas.DownloadHistory, db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 删除下载历史记录 """ DownloadHistory.delete(db, history_in.id) return schemas.Response(success=True) @router.get("/transfer", summary="查询转移历史记录", response_model=schemas.Response) def transfer_history(title: str = None, page: int = 1, count: int = 30, db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 查询转移历史记录 """ if title: total = TransferHistory.count_by_title(db, title) result = TransferHistory.list_by_title(db, title, page, count) else: result = TransferHistory.list_by_page(db, page, count) total = TransferHistory.count(db) return schemas.Response(success=True, data={ "list": result, "total": total, }) @router.delete("/transfer", summary="删除转移历史记录", response_model=schemas.Response) def delete_transfer_history(history_in: schemas.TransferHistory, deletesrc: bool = False, deletedest: bool = False, db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 删除转移历史记录 """ history = TransferHistory.get(db, history_in.id) if not history: return schemas.Response(success=False, msg="记录不存在") # 册除媒体库文件 if deletedest and history.dest: TransferChain(db).delete_files(Path(history.dest)) # 删除源文件 if deletesrc and history.src: TransferChain(db).delete_files(Path(history.src)) # 删除记录 TransferHistory.delete(db, history_in.id) return schemas.Response(success=True) @router.post("/transfer", summary="历史记录重新转移", response_model=schemas.Response) def redo_transfer_history(history_in: schemas.TransferHistory, mtype: str = None, new_tmdbid: int = None, db: Session = Depends(get_db), _: schemas.TokenPayload = Depends(verify_token)) -> Any: """ 历史记录重新转移,不输入 mtype 和 new_tmdbid 时,自动使用文件名重新识别 """ if mtype and new_tmdbid: state, errmsg = TransferChain(db).re_transfer(logid=history_in.id, mtype=MediaType(mtype), tmdbid=new_tmdbid) else: state, errmsg = TransferChain(db).re_transfer(logid=history_in.id) if state: return schemas.Response(success=True) else: return schemas.Response(success=False, message=errmsg)