184 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			184 lines
		
	
	
		
			5.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import shutil
 | |
| from pathlib import Path
 | |
| from typing import Any, List
 | |
| 
 | |
| from fastapi import APIRouter, Depends
 | |
| from starlette.responses import FileResponse, Response
 | |
| 
 | |
| from app import schemas
 | |
| from app.core.config import settings
 | |
| from app.core.security import verify_token
 | |
| from app.log import logger
 | |
| from app.utils.system import SystemUtils
 | |
| 
 | |
| router = APIRouter()
 | |
| 
 | |
| IMAGE_TYPES = [".jpg", ".png", ".gif", ".bmp", ".jpeg", ".webp"]
 | |
| 
 | |
| 
 | |
| @router.get("/list", summary="所有插件", response_model=List[schemas.FileItem])
 | |
| def list_path(path: str, sort: str = 'time', _: schemas.TokenPayload = Depends(verify_token)) -> Any:
 | |
|     """
 | |
|     查询当前目录下所有目录和文件
 | |
|     """
 | |
|     # 返回结果
 | |
|     ret_items = []
 | |
|     if not path or path == "/":
 | |
|         if SystemUtils.is_windows():
 | |
|             partitions = SystemUtils.get_windows_drives() or ["C:/"]
 | |
|             for partition in partitions:
 | |
|                 ret_items.append(schemas.FileItem(
 | |
|                     type="dir",
 | |
|                     path=partition + "/",
 | |
|                     name=partition,
 | |
|                     basename=partition
 | |
|                 ))
 | |
|             return ret_items
 | |
|         else:
 | |
|             path = "/"
 | |
|     else:
 | |
|         if not SystemUtils.is_windows() and not path.startswith("/"):
 | |
|             path = "/" + path
 | |
| 
 | |
|     # 遍历目录
 | |
|     path_obj = Path(path)
 | |
|     if not path_obj.exists():
 | |
|         logger.error(f"目录不存在:{path}")
 | |
|         return []
 | |
| 
 | |
|     # 如果是文件
 | |
|     if path_obj.is_file():
 | |
|         ret_items.append(schemas.FileItem(
 | |
|             type="file",
 | |
|             path=str(path_obj).replace("\\", "/"),
 | |
|             name=path_obj.name,
 | |
|             basename=path_obj.stem,
 | |
|             extension=path_obj.suffix[1:],
 | |
|             size=path_obj.stat().st_size,
 | |
|             modify_time=path_obj.stat().st_mtime,
 | |
|         ))
 | |
|         return ret_items
 | |
| 
 | |
|     # 扁历所有目录
 | |
|     for item in SystemUtils.list_sub_directory(path_obj):
 | |
|         ret_items.append(schemas.FileItem(
 | |
|             type="dir",
 | |
|             path=str(item).replace("\\", "/") + "/",
 | |
|             name=item.name,
 | |
|             basename=item.stem,
 | |
|             modify_time=item.stat().st_mtime,
 | |
|         ))
 | |
| 
 | |
|     # 遍历所有文件,不含子目录
 | |
|     for item in SystemUtils.list_sub_files(path_obj,
 | |
|                                            settings.RMT_MEDIAEXT
 | |
|                                            + settings.RMT_SUBEXT
 | |
|                                            + IMAGE_TYPES
 | |
|                                            + [".nfo"]):
 | |
|         ret_items.append(schemas.FileItem(
 | |
|             type="file",
 | |
|             path=str(item).replace("\\", "/"),
 | |
|             name=item.name,
 | |
|             basename=item.stem,
 | |
|             extension=item.suffix[1:],
 | |
|             size=item.stat().st_size,
 | |
|             modify_time=item.stat().st_mtime,
 | |
|         ))
 | |
|     # 排序
 | |
|     if sort == 'time':
 | |
|         ret_items.sort(key=lambda x: x.modify_time, reverse=True)
 | |
|     else:
 | |
|         ret_items.sort(key=lambda x: x.name, reverse=False)
 | |
|     return ret_items
 | |
| 
 | |
| 
 | |
| @router.get("/mkdir", summary="创建目录", response_model=schemas.Response)
 | |
| def mkdir(path: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
 | |
|     """
 | |
|     创建目录
 | |
|     """
 | |
|     if not path:
 | |
|         return schemas.Response(success=False)
 | |
|     path_obj = Path(path)
 | |
|     if path_obj.exists():
 | |
|         return schemas.Response(success=False)
 | |
|     path_obj.mkdir(parents=True, exist_ok=True)
 | |
|     return schemas.Response(success=True)
 | |
| 
 | |
| 
 | |
| @router.get("/delete", summary="删除文件或目录", response_model=schemas.Response)
 | |
| def delete(path: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
 | |
|     """
 | |
|     删除文件或目录
 | |
|     """
 | |
|     if not path:
 | |
|         return schemas.Response(success=False)
 | |
|     path_obj = Path(path)
 | |
|     if not path_obj.exists():
 | |
|         return schemas.Response(success=True)
 | |
|     if path_obj.is_file():
 | |
|         path_obj.unlink()
 | |
|     else:
 | |
|         shutil.rmtree(path_obj, ignore_errors=True)
 | |
|     return schemas.Response(success=True)
 | |
| 
 | |
| 
 | |
| @router.get("/download", summary="下载文件或目录")
 | |
| def download(path: str, token: str) -> Any:
 | |
|     """
 | |
|     下载文件或目录
 | |
|     """
 | |
|     if not path:
 | |
|         return schemas.Response(success=False)
 | |
|     # 认证token
 | |
|     if not verify_token(token):
 | |
|         return None
 | |
|     path_obj = Path(path)
 | |
|     if not path_obj.exists():
 | |
|         return schemas.Response(success=False)
 | |
|     if path_obj.is_file():
 | |
|         # 做为文件流式下载
 | |
|         return FileResponse(path_obj)
 | |
|     else:
 | |
|         # 做为压缩包下载
 | |
|         shutil.make_archive(base_name=path_obj.stem, format="zip", root_dir=path_obj)
 | |
|         reponse = Response(content=path_obj.read_bytes(), media_type="application/zip")
 | |
|         # 删除压缩包
 | |
|         Path(f"{path_obj.stem}.zip").unlink()
 | |
|         return reponse
 | |
| 
 | |
| 
 | |
| @router.get("/rename", summary="重命名文件或目录", response_model=schemas.Response)
 | |
| def rename(path: str, new_name: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
 | |
|     """
 | |
|     重命名文件或目录
 | |
|     """
 | |
|     if not path or not new_name:
 | |
|         return schemas.Response(success=False)
 | |
|     path_obj = Path(path)
 | |
|     if not path_obj.exists():
 | |
|         return schemas.Response(success=False)
 | |
|     path_obj.rename(path_obj.parent / new_name)
 | |
|     return schemas.Response(success=True)
 | |
| 
 | |
| 
 | |
| @router.get("/image", summary="读取图片")
 | |
| def image(path: str, token: str) -> Any:
 | |
|     """
 | |
|     读取图片
 | |
|     """
 | |
|     if not path:
 | |
|         return None
 | |
|     # 认证token
 | |
|     if not verify_token(token):
 | |
|         return None
 | |
|     path_obj = Path(path)
 | |
|     if not path_obj.exists():
 | |
|         return None
 | |
|     if not path_obj.is_file():
 | |
|         return None
 | |
|     # 判断是否图片文件
 | |
|     if path_obj.suffix.lower() not in IMAGE_TYPES:
 | |
|         return None
 | |
|     return Response(content=path_obj.read_bytes(), media_type="image/jpeg")
 |