fix api
This commit is contained in:
@ -7,11 +7,13 @@ from typing import Optional, Tuple, List
|
||||
|
||||
from requests import Response
|
||||
|
||||
from app import schemas
|
||||
from app.core.config import settings
|
||||
from app.db.systemconfig_oper import SystemConfigOper
|
||||
from app.log import logger
|
||||
from app.schemas.types import SystemConfigKey
|
||||
from app.utils.http import RequestUtils
|
||||
from app.utils.string import StringUtils
|
||||
from app.utils.system import SystemUtils
|
||||
|
||||
|
||||
@ -314,7 +316,7 @@ class AliyunHelper:
|
||||
return {}
|
||||
|
||||
def list(self, drive_id: str = None, parent_file_id: str = 'root', list_type: str = None,
|
||||
limit: int = 100, order_by: str = 'updated_at') -> List[dict]:
|
||||
limit: int = 100, order_by: str = 'updated_at', path: str = "/") -> List[schemas.FileItem]:
|
||||
"""
|
||||
浏览文件
|
||||
limit 返回文件数量,默认 50,最大 100
|
||||
@ -330,21 +332,22 @@ class AliyunHelper:
|
||||
# 根目录处理
|
||||
if not drive_id:
|
||||
return [
|
||||
{
|
||||
"file_id": parent_file_id,
|
||||
"drive_id": params.get("resourceDriveId"),
|
||||
"parent_file_id": "root",
|
||||
"type": "folder",
|
||||
"path": "/资源库/",
|
||||
"name": "资源库",
|
||||
}, {
|
||||
"file_id": parent_file_id,
|
||||
"drive_id": params.get("backDriveId"),
|
||||
"parent_file_id": "root",
|
||||
"type": "folder",
|
||||
"path": "/备份盘/",
|
||||
"name": "备份盘",
|
||||
}
|
||||
schemas.FileItem(
|
||||
fileid=parent_file_id,
|
||||
drive_id=params.get("resourceDriveId"),
|
||||
parent_fileid="root",
|
||||
type="folder",
|
||||
path="/资源库/",
|
||||
name="资源库"
|
||||
),
|
||||
schemas.FileItem(
|
||||
fileid=parent_file_id,
|
||||
drive_id=params.get("backDriveId"),
|
||||
parent_fileid="root",
|
||||
type="folder",
|
||||
path="/备份盘/",
|
||||
name="备份盘"
|
||||
)
|
||||
]
|
||||
# 返回数据
|
||||
ret_items = []
|
||||
@ -380,9 +383,20 @@ class AliyunHelper:
|
||||
else:
|
||||
self.__handle_error(res, "浏览文件")
|
||||
break
|
||||
return ret_items
|
||||
return [schemas.FileItem(
|
||||
fileid=fileinfo.get("file_id"),
|
||||
parent_fileid=fileinfo.get("parent_file_id"),
|
||||
type="file",
|
||||
path=f"{path}{fileinfo.get('name')}",
|
||||
name=fileinfo.get("name"),
|
||||
size=fileinfo.get("size"),
|
||||
extension=fileinfo.get("file_extension"),
|
||||
modify_time=StringUtils.str_to_timestamp(fileinfo.get("updated_at")),
|
||||
thumbnail=fileinfo.get("thumbnail"),
|
||||
drive_id=fileinfo.get("drive_id"),
|
||||
) for fileinfo in ret_items]
|
||||
|
||||
def create_folder(self, parent_file_id: str, name: str) -> Optional[dict]:
|
||||
def create_folder(self, parent_file_id: str, name: str, path: str = "/") -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
创建目录
|
||||
"""
|
||||
@ -410,13 +424,14 @@ class AliyunHelper:
|
||||
}
|
||||
"""
|
||||
result = res.json()
|
||||
return {
|
||||
"file_id": result.get("file_id"),
|
||||
"drive_id": result.get("drive_id"),
|
||||
"parent_file_id": result.get("parent_file_id"),
|
||||
"type": result.get("type"),
|
||||
"name": result.get("file_name")
|
||||
}
|
||||
return schemas.FileItem(
|
||||
fileid=result.get("file_id"),
|
||||
drive_id=result.get("drive_id"),
|
||||
parent_fileid=result.get("parent_file_id"),
|
||||
type=result.get("type"),
|
||||
name=result.get("file_name"),
|
||||
path=f"{path}{result.get('file_name')}",
|
||||
)
|
||||
else:
|
||||
self.__handle_error(res, "创建目录")
|
||||
return None
|
||||
@ -439,7 +454,7 @@ class AliyunHelper:
|
||||
self.__handle_error(res, "删除文件")
|
||||
return False
|
||||
|
||||
def detail(self, file_id: str) -> Optional[dict]:
|
||||
def detail(self, file_id: str, path: str = "/") -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
获取文件详情
|
||||
"""
|
||||
@ -452,7 +467,19 @@ class AliyunHelper:
|
||||
"file_id": file_id
|
||||
})
|
||||
if res:
|
||||
return res.json()
|
||||
result = res.json()
|
||||
return schemas.FileItem(
|
||||
fileid=result.get("file_id"),
|
||||
drive_id=result.get("drive_id"),
|
||||
parent_fileid=result.get("parent_file_id"),
|
||||
type=result.get("type"),
|
||||
name=result.get("name"),
|
||||
size=result.get("size"),
|
||||
extension=result.get("file_extension"),
|
||||
modify_time=StringUtils.str_to_timestamp(result.get("updated_at")),
|
||||
thumbnail=result.get("thumbnail"),
|
||||
path=f"{path}{result.get('name')}"
|
||||
)
|
||||
else:
|
||||
self.__handle_error(res, "获取文件详情")
|
||||
return None
|
||||
|
@ -1,12 +1,13 @@
|
||||
import base64
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple, Generator
|
||||
from typing import Optional, Tuple, List
|
||||
|
||||
import oss2
|
||||
import py115
|
||||
from py115 import Cloud
|
||||
from py115.types import LoginTarget, QrcodeSession, QrcodeStatus, Credential, File, DownloadTicket
|
||||
from py115.types import LoginTarget, QrcodeSession, QrcodeStatus, Credential, DownloadTicket
|
||||
|
||||
from app import schemas
|
||||
from app.db.systemconfig_oper import SystemConfigOper
|
||||
from app.log import logger
|
||||
from app.schemas.types import SystemConfigKey
|
||||
@ -137,26 +138,46 @@ class U115Helper(metaclass=Singleton):
|
||||
logger.error(f"获取115存储空间失败:{str(e)}")
|
||||
return None
|
||||
|
||||
def list(self, parent_file_id: str = '0') -> Optional[Generator[File, None, None]]:
|
||||
def list(self, parent_file_id: str = '0', path: str = "/") -> Optional[List[schemas.FileItem]]:
|
||||
"""
|
||||
浏览文件
|
||||
"""
|
||||
if not self.__init_cloud():
|
||||
return None
|
||||
try:
|
||||
return self.cloud.storage().list(dir_id=parent_file_id)
|
||||
items = self.cloud.storage().list(dir_id=parent_file_id)
|
||||
return [schemas.FileItem(
|
||||
fileid=item.file_id,
|
||||
parent_fileid=item.parent_id,
|
||||
type="dir" if item.is_dir else "file",
|
||||
path=f"{path}{item.name}" + "/" if item.is_dir else "",
|
||||
name=item.name,
|
||||
size=item.size,
|
||||
extension=Path(item.name).suffix[1:],
|
||||
modify_time=item.modified_time.timestamp() if item.modified_time else 0,
|
||||
pickcode=item.pickcode
|
||||
) for item in items]
|
||||
except Exception as e:
|
||||
logger.error(f"浏览115文件失败:{str(e)}")
|
||||
return None
|
||||
|
||||
def create_folder(self, parent_file_id: str, name: str) -> Optional[File]:
|
||||
def create_folder(self, parent_file_id: str, name: str, path: str = "/") -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
创建目录
|
||||
"""
|
||||
if not self.__init_cloud():
|
||||
return None
|
||||
try:
|
||||
return self.cloud.storage().make_dir(parent_file_id, name)
|
||||
result = self.cloud.storage().make_dir(parent_file_id, name)
|
||||
return schemas.FileItem(
|
||||
fileid=result.file_id,
|
||||
parent_fileid=result.parent_id,
|
||||
type="dir",
|
||||
path=f"{path}{name}/",
|
||||
name=name,
|
||||
modify_time=result.modified_time.timestamp() if result.modified_time else 0,
|
||||
pickcode=result.pickcode
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"创建115目录失败:{str(e)}")
|
||||
return None
|
||||
|
Reference in New Issue
Block a user