fix 115 pan

This commit is contained in:
jxxghp 2024-06-19 13:02:04 +08:00
parent 7103b0334a
commit 060e2f225c
5 changed files with 196 additions and 103 deletions

View File

@ -1,7 +1,7 @@
from pathlib import Path
from typing import Any, List
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException
from starlette.responses import Response
from app import schemas
@ -147,7 +147,7 @@ def download_aliyun(fileid: str,
if url:
# 重定向
return Response(status_code=302, headers={"Location": url})
return schemas.Response(success=False)
raise HTTPException(status_code=500, detail="下载文件出错")
@router.get("/rename", summary="重命名文件或目录(阿里云盘)", response_model=schemas.Response)
@ -202,4 +202,4 @@ def image_aliyun(fileid: str, _: schemas.TokenPayload = Depends(verify_uri_token
if url:
# 重定向
return Response(status_code=302, headers={"Location": url})
return schemas.Response(success=False)
raise HTTPException(status_code=500, detail="下载图片出错")

View File

@ -2,7 +2,7 @@ import shutil
from pathlib import Path
from typing import Any, List
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException
from starlette.responses import FileResponse, Response
from app import schemas
@ -181,7 +181,7 @@ def download_local(path: str, _: schemas.TokenPayload = Depends(verify_uri_token
return schemas.Response(success=False)
path_obj = Path(path)
if not path_obj.exists():
return schemas.Response(success=False)
raise HTTPException(status_code=404, detail="文件不存在")
if path_obj.is_file():
# 做为文件流式下载
return FileResponse(path_obj)
@ -247,5 +247,5 @@ def image_local(path: str, _: schemas.TokenPayload = Depends(verify_uri_token))
return None
# 判断是否图片文件
if path_obj.suffix.lower() not in IMAGE_TYPES:
return None
raise HTTPException(status_code=500, detail="图片读取出错")
return Response(content=path_obj.read_bytes(), media_type="image/jpeg")

View File

@ -1,7 +1,8 @@
import base64
from pathlib import Path
from typing import Any, List
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException
from starlette.responses import Response
from app import schemas
@ -10,7 +11,7 @@ from app.core.config import settings
from app.core.metainfo import MetaInfoPath
from app.core.security import verify_token, verify_uri_token
from app.helper.u115 import U115Helper
from app.utils.string import StringUtils
from app.utils.http import RequestUtils
router = APIRouter()
@ -20,36 +21,43 @@ def qrcode(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
生成二维码
"""
qrcode_data, errmsg = U115Helper().generate_qrcode()
qrcode_data = U115Helper().generate_qrcode()
if qrcode_data:
return schemas.Response(success=True, data=qrcode_data)
return schemas.Response(success=False, message=errmsg)
return schemas.Response(success=True, data={
'codeContent': qrcode_data
})
return schemas.Response(success=False)
@router.get("/check", summary="二维码登录确认", response_model=schemas.Response)
def check(ck: str, t: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
def check(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
二维码登录确认
"""
if not ck or not t:
return schemas.Response(success=False, message="参数错误")
data, errmsg = U115Helper().check_login(ck, t)
data, errmsg = U115Helper().check_login()
if data:
return schemas.Response(success=True, data=data)
return schemas.Response(success=False, message=errmsg)
@router.get("/userinfo", summary="查询用户信息", response_model=schemas.Response)
def userinfo(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
@router.get("/storage", summary="查询存储空间信息", response_model=schemas.Response)
def storage(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询用户信息
查询存储空间信息
"""
pass
storage_info = U115Helper().get_storage()
if storage_info:
return schemas.Response(success=True, data={
"total": storage_info[0],
"used": storage_info[1]
})
return schemas.Response(success=False)
@router.get("/list", summary="所有目录和文件115网盘", response_model=List[schemas.FileItem])
def list_115(path: str,
fileid: str,
pickcode: str,
filetype: str = "dir",
sort: str = 'updated_at',
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
@ -57,6 +65,7 @@ def list_115(path: str,
查询当前目录下所有目录和文件
:param path: 当前路径
:param fileid: 文件ID
:param pickcode: 115 pickcode
:param filetype: 文件类型
:param sort: 排序方式name:按名称排序time:按修改时间排序
:param _: token
@ -66,37 +75,38 @@ def list_115(path: str,
return []
if not path:
path = "/"
if sort == "time":
sort = "updated_at"
if fileid == "root":
fileid = "0"
if filetype == "file":
fileinfo = U115Helper().get_file_detail(fileid)
if fileinfo:
return [schemas.FileItem(
fileid=fileinfo.get("file_id"),
parent_fileid=fileinfo.get("parent_file_id"),
type="file",
path=f"{path}{fileinfo.get('name')}",
name=fileinfo.get("name"),
size=fileinfo.get("size"),
extension=fileinfo.get("file_extension"),
modify_time=StringUtils.str_to_timestamp(fileinfo.get("updated_at")),
thumbnail=fileinfo.get("thumbnail")
)]
return []
name = Path(path).name
suffix = Path(name).suffix[1:]
return [schemas.FileItem(
fileid=fileid,
type="file",
path=path.rstrip('/'),
name=name,
extension=suffix,
pickcode=pickcode
)]
items = U115Helper().list_files(parent_file_id=fileid)
if not items:
return []
return [schemas.FileItem(
fileid=item.get("file_id"),
parent_fileid=item.get("parent_file_id"),
type="dir" if item.get("type") == "folder" else "file",
path=f"{path}{item.get('name')}" + "/" if item.get("type") == "folder" else "",
name=item.get("name"),
size=item.get("size"),
extension=item.get("file_extension"),
modify_time=StringUtils.str_to_timestamp(item.get("updated_at")),
thumbnail=item.get("thumbnail")
file_list = [schemas.FileItem(
fileid=item.file_id,
parent_fileid=item.parent_id,
type="dir" if item.is_dir else "file",
path=f"{path}{item.name}" + "/" if item.is_dir else "",
name=item.name,
size=item.size,
extension=Path(item.name).suffix[1:],
modify_time=item.modified_time.timestamp() if item.modified_time else 0,
pickcode=item.pickcode
) for item in items]
if sort == "name":
file_list.sort(key=lambda x: x.name)
else:
file_list.sort(key=lambda x: x.modify_time, reverse=True)
return file_list
@router.get("/mkdir", summary="创建目录115网盘", response_model=schemas.Response)
@ -129,17 +139,19 @@ def delete_115(fileid: str,
@router.get("/download", summary="下载文件115网盘")
def download_115(fileid: str,
def download_115(pickcode: str,
_: schemas.TokenPayload = Depends(verify_uri_token)) -> Any:
"""
下载文件或目录
"""
if not fileid:
if not pickcode:
return schemas.Response(success=False)
url = U115Helper().get_download_url(fileid)
if url:
# 重定向
return Response(status_code=302, headers={"Location": url})
ticket = U115Helper().download(pickcode)
if ticket:
# 请求数据,并以文件流的方式返回
res = RequestUtils(headers=ticket.headers).get_res(ticket.url)
if res:
return Response(content=res.content, media_type="application/octet-stream")
return schemas.Response(success=False)
@ -184,15 +196,18 @@ def rename_115(fileid: str, new_name: str, path: str,
return schemas.Response(success=False)
@router.get("/image", summary="读取图片115网盘", response_model=schemas.Response)
def image_115(fileid: str, _: schemas.TokenPayload = Depends(verify_uri_token)) -> Any:
@router.get("/image", summary="读取图片115网盘")
def image_115(pickcode: str, _: schemas.TokenPayload = Depends(verify_uri_token)) -> Any:
"""
读取图片
"""
if not fileid:
if not pickcode:
return schemas.Response(success=False)
url = U115Helper().get_download_url(fileid)
if url:
# 重定向
return Response(status_code=302, headers={"Location": url})
return schemas.Response(success=False)
ticket = U115Helper().download(pickcode)
if ticket:
# 请求数据获取内容编码为图片base64返回
res = RequestUtils(headers=ticket.headers).get_res(ticket.url)
if res:
content_type = res.headers.get("Content-Type")
return Response(content=res.content, media_type=content_type)
raise HTTPException(status_code=500, detail="下载图片出错")

View File

@ -1,10 +1,12 @@
from typing import Optional, Tuple, List
import base64
from typing import Optional, Tuple, Generator
import py115
from py115 import Cloud
from py115.types import LoginTarget, QrcodeSession, QrcodeStatus
from py115.types import LoginTarget, QrcodeSession, QrcodeStatus, Credential, File, DownloadTicket
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.singleton import Singleton
from app.utils.system import SystemUtils
@ -15,32 +17,52 @@ class U115Helper(metaclass=Singleton):
115相关操作
"""
cloud: Cloud = None
session: QrcodeSession = None
cloud: Optional[Cloud] = None
_session: QrcodeSession = None
def __init__(self):
self.systemconfig = SystemConfigOper()
@property
def cookies(self):
def __init_cloud(self) -> bool:
"""
获取115认证参数并初始化参数格式
初始化Cloud
"""
return self.systemconfig.get(SystemConfigKey.User115Params) or {}
credential = self.credential
if not credential:
logger.warn("115未登录请先登录")
return False
try:
if not self.cloud:
self.cloud = py115.connect(credential)
except Exception as err:
logger.error(f"115连接失败请重新扫码登录{str(err)}")
self.clear_credential()
return False
return True
def save_credentail(self, cookies: dict):
@property
def credential(self) -> Optional[Credential]:
"""
获取已保存的115认证参数
"""
cookie_dict = self.systemconfig.get(SystemConfigKey.User115Params)
if not cookie_dict:
return None
return Credential.from_dict(cookie_dict)
def save_credentail(self, credential: Credential):
"""
设置115认证参数
"""
self.systemconfig.set(SystemConfigKey.User115Params, cookies)
self.systemconfig.set(SystemConfigKey.User115Params, credential.to_dict())
def clear_params(self):
def clear_credential(self):
"""
清除115认证参数
"""
self.systemconfig.delete(SystemConfigKey.User115Params)
def generate_qrcode(self) -> Optional[Tuple[dict, str]]:
def generate_qrcode(self) -> Optional[str]:
"""
生成二维码
"""
@ -58,67 +80,99 @@ class U115Helper(metaclass=Singleton):
try:
self.cloud = py115.connect()
self.session = self.cloud.qrcode_login(__get_os)
return self.session.image_data, ""
self._session = self.cloud.qrcode_login(__get_os())
image_bin = self._session.image_data
if not image_bin:
logger.warn("115生成二维码失败未获取到二维码数据")
return None
# 转换为base64图片格式
image_base64 = base64.b64encode(image_bin).decode()
return f"data:image/png;base64,{image_base64}"
except Exception as e:
return None, f"115生成二维码失败{str(e)}"
logger.warn(f"115生成二维码失败{str(e)}")
return None
def check_login(self, ck: str, t: str) -> Optional[Tuple[dict, str]]:
def check_login(self) -> Optional[Tuple[dict, str]]:
"""
二维码登录确认
"""
if not self.session:
return None, "请先生成二维码!"
if not self._session:
return {}, "请先生成二维码!"
try:
status = self.cloud.qrcode_poll(self.session)
if not self.cloud:
return {}, "请先生成二维码!"
status = self.cloud.qrcode_poll(self._session)
if status == QrcodeStatus.Done:
# 确认完成,保存认证信息
self.save_credentail(self.cloud.export_credentail())
result = {
"status": 1,
"tip": "登录成功!"
}
elif status == QrcodeStatus.Waiting:
return {
result = {
"status": 0,
"tip": "等待扫码确认..."
}, ""
"tip": "请使用微信或115客户端扫码"
}
elif status == QrcodeStatus.Expired:
return {
result = {
"status": -1,
"tip": "二维码已过期,请重新刷新!"
}, ""
}
self.cloud = None
elif status == QrcodeStatus.Failed:
return {
result = {
"status": -2,
"tip": "登录失败,请重试!"
}, ""
return None, "登录确认失败!"
}
self.cloud = None
else:
result = {
"status": -3,
"tip": "未知错误,请重试!"
}
self.cloud = None
return result, ""
except Exception as e:
return None, f"115登录确认失败{str(e)}"
return {}, f"115登录确认失败{str(e)}"
def list_files(self, parent_file_id: str = '0') -> List[dict]:
def list_files(self, parent_file_id: str = '0') -> Optional[Generator[File, None, None]]:
"""
浏览文件
"""
cookies = self.cookies
if not cookies:
return []
return self.cloud.storage().list(dir_id=parent_file_id)
if not self.__init_cloud():
return None
try:
return self.cloud.storage().list(dir_id=parent_file_id)
except Exception as e:
logger.error(f"浏览115文件失败{str(e)}")
return None
def create_folder(self, parent_file_id: str, name: str) -> bool:
"""
创建目录
"""
cookies = self.cookies
if not cookies:
if not self.__init_cloud():
return False
return self.cloud.storage().make_dir(parent_file_id, name)
try:
self.cloud.storage().make_dir(parent_file_id, name)
return True
except Exception as e:
logger.error(f"创建115目录失败{str(e)}")
return False
def delete_file(self, file_id: str) -> bool:
"""
删除文件
"""
cookies = self.cookies
if not cookies:
if not self.__init_cloud():
return False
return self.cloud.storage().delete(file_id)
try:
self.cloud.storage().delete(file_id)
return True
except Exception as e:
logger.error(f"删除115文件失败{str(e)}")
return False
def get_file_detail(self, file_id: str) -> Optional[dict]:
"""
@ -130,13 +184,35 @@ class U115Helper(metaclass=Singleton):
"""
重命名文件
"""
cookies = self.cookies
if not cookies:
if not self.__init_cloud():
return False
return self.cloud.storage().rename(file_id, name)
try:
self.cloud.storage().rename(file_id, name)
return True
except Exception as e:
logger.error(f"重命名115文件失败{str(e)}")
return False
def get_download_url(self, file_id: str) -> Optional[str]:
def download(self, pickcode: str) -> Optional[DownloadTicket]:
"""
获取下载链接
"""
pass
if not self.__init_cloud():
return None
try:
return self.cloud.storage().request_download(pickcode)
except Exception as e:
logger.error(f"115下载失败{str(e)}")
return None
def get_storage(self) -> Optional[Tuple[int, int]]:
"""
获取存储空间
"""
if not self.__init_cloud():
return None
try:
return self.cloud.storage().space()
except Exception as e:
logger.error(f"获取115存储空间失败{str(e)}")
return None

View File

@ -26,3 +26,5 @@ class FileItem(BaseModel):
parent_fileid: Optional[str] = None
# 缩略图
thumbnail: Optional[str] = None
# 115 pickcode
pickcode: Optional[str] = None