add aliyun apis

This commit is contained in:
jxxghp 2024-06-17 19:45:39 +08:00
parent ce1db7f62b
commit 1b6a06bd7b
7 changed files with 457 additions and 19 deletions

View File

@ -2,7 +2,7 @@ from fastapi import APIRouter
from app.api.endpoints import login, user, site, message, webhook, subscribe, \
media, douban, search, plugin, tmdb, history, system, download, dashboard, \
filebrowser, transfer, mediaserver, bangumi
filebrowser, transfer, mediaserver, bangumi, aliyun
api_router = APIRouter()
api_router.include_router(login.router, prefix="/login", tags=["login"])
@ -24,4 +24,4 @@ api_router.include_router(filebrowser.router, prefix="/filebrowser", tags=["file
api_router.include_router(transfer.router, prefix="/transfer", tags=["transfer"])
api_router.include_router(mediaserver.router, prefix="/mediaserver", tags=["mediaserver"])
api_router.include_router(bangumi.router, prefix="/bangumi", tags=["bangumi"])
api_router.include_router(aliyun.router, prefix="/aliyun", tags=["aliyun"])

View File

@ -0,0 +1,30 @@
from typing import Any
from fastapi import APIRouter, Depends
from app import schemas
from app.core.security import verify_token
from app.helper.aliyun import AliyunHelper
router = APIRouter()
@router.get("/qrcode", summary="生成二维码内容", response_model=schemas.Response)
def qrcode(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
生成二维码
"""
qrcode_data, errmsg = AliyunHelper().generate_qrcode()
if qrcode_data:
return schemas.Response(success=True, data=qrcode_data)
return schemas.Response(success=False, message=errmsg)
@router.get("/check", summary="二维码登录确认", response_model=schemas.Response)
def check(ck: str, t: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
if not ck or not t:
return schemas.Response(success=False, message="参数错误")
data, errmsg = AliyunHelper().check_login(ck, t)
if data:
return schemas.Response(success=True, data=data)
return schemas.Response(success=False, message=errmsg)

View File

@ -1,4 +1,5 @@
import shutil
from datetime import datetime
from pathlib import Path
from typing import Any, List
@ -7,8 +8,10 @@ from starlette.responses import FileResponse, Response
from app import schemas
from app.core.config import settings
from app.core.security import verify_token
from app.core.security import verify_token, verify_uri_token
from app.helper.aliyun import AliyunHelper
from app.log import logger
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
router = APIRouter()
@ -16,7 +19,7 @@ router = APIRouter()
IMAGE_TYPES = [".jpg", ".png", ".gif", ".bmp", ".jpeg", ".webp"]
@router.get("/list", summary="所有目录和文件", response_model=List[schemas.FileItem])
@router.get("/local/list", summary="所有目录和文件(本地)", response_model=List[schemas.FileItem])
def list_path(path: str,
sort: str = 'time',
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
@ -98,8 +101,8 @@ def list_path(path: str,
return ret_items
@router.get("/listdir", summary="所有目录(不含文件)", response_model=List[schemas.FileItem])
def list_dir(path: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
@router.get("/local/listdir", summary="所有目录(本地,不含文件)", response_model=List[schemas.FileItem])
def list_dir(path: str, _: schemas.TokenPayload = Depends(verify_uri_token)) -> Any:
"""
查询当前目录下所有目录
"""
@ -139,7 +142,7 @@ def list_dir(path: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
return ret_items
@router.get("/mkdir", summary="创建目录", response_model=schemas.Response)
@router.get("/local/mkdir", summary="创建目录(本地)", response_model=schemas.Response)
def mkdir(path: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
创建目录
@ -153,7 +156,7 @@ def mkdir(path: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
return schemas.Response(success=True)
@router.get("/delete", summary="删除文件或目录", response_model=schemas.Response)
@router.get("/local/delete", summary="删除文件或目录(本地)", response_model=schemas.Response)
def delete(path: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
删除文件或目录
@ -170,16 +173,13 @@ def delete(path: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
return schemas.Response(success=True)
@router.get("/download", summary="下载文件或目录")
def download(path: str, token: str) -> Any:
@router.get("/local/download", summary="下载文件(本地)")
def download(path: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
下载文件或目录
"""
if not path:
return schemas.Response(success=False)
# 认证token
if not verify_token(token):
return None
path_obj = Path(path)
if not path_obj.exists():
return schemas.Response(success=False)
@ -195,7 +195,7 @@ def download(path: str, token: str) -> Any:
return reponse
@router.get("/rename", summary="重命名文件或目录", response_model=schemas.Response)
@router.get("/local/rename", summary="重命名文件或目录(本地)", response_model=schemas.Response)
def rename(path: str, new_name: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
重命名文件或目录
@ -209,16 +209,13 @@ def rename(path: str, new_name: str, _: schemas.TokenPayload = Depends(verify_to
return schemas.Response(success=True)
@router.get("/image", summary="读取图片")
def image(path: str, token: str) -> Any:
@router.get("/local/image", summary="读取图片(本地)")
def image(path: str, _: schemas.TokenPayload = Depends(verify_uri_token)) -> Any:
"""
读取图片
"""
if not path:
return None
# 认证token
if not verify_token(token):
return None
path_obj = Path(path)
if not path_obj.exists():
return None
@ -228,3 +225,91 @@ def image(path: str, token: str) -> Any:
if path_obj.suffix.lower() not in IMAGE_TYPES:
return None
return Response(content=path_obj.read_bytes(), media_type="image/jpeg")
@router.get("/aliyun/list", summary="所有目录和文件(阿里云盘)", response_model=List[schemas.FileItem])
def list_path(path: str,
fileid: str,
sort: str = 'updated_at',
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询当前目录下所有目录和文件
:param path: 当前路径
:param fileid: 文件ID
:param sort: 排序方式name:按名称排序time:按修改时间排序
:param _: token
:return: 所有目录和文件
"""
if not fileid:
return []
if not path:
path = "/"
if sort == "time":
sort = "updated_at"
items = AliyunHelper().list_files(parent_file_id=fileid, order_by=sort)
if not items:
return []
return [schemas.FileItem(
fileid=item.get("file_id"),
parent_fileid=item.get("parent_file_id"),
type="dir" if item.get("type") == "folder" else "file",
path=f"{path}{item.get('name')}/",
name=item.get("name"),
size=item.get("size"),
extension=item.get("file_extension"),
modify_time=StringUtils.str_to_timestamp(item.get("updated_at"))
) for item in items]
@router.get("/aliyun/listdir", summary="所有目录(阿里云盘,不含文件)", response_model=List[schemas.FileItem])
def list_dir(path: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询当前目录下所有目录
"""
if not path:
return []
@router.get("/aliyun/mkdir", summary="创建目录(阿里云盘)", response_model=schemas.Response)
def mkdir(path: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
创建目录
"""
if not path:
return schemas.Response(success=False)
@router.get("/aliyun/delete", summary="删除文件或目录(阿里云盘)", response_model=schemas.Response)
def delete(path: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
删除文件或目录
"""
if not path:
return schemas.Response(success=False)
@router.get("/aliyun/download", summary="下载文件(阿里云盘)")
def download(path: str, _: schemas.TokenPayload = Depends(verify_uri_token)) -> Any:
"""
下载文件或目录
"""
if not path:
return schemas.Response(success=False)
@router.get("/aliyun/rename", summary="重命名文件或目录(阿里云盘)", response_model=schemas.Response)
def rename(path: str, new_name: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
重命名文件或目录
"""
if not path or not new_name:
return schemas.Response(success=False)
@router.get("/aliyun/image", summary="读取图片(阿里云盘)", response_model=schemas.Response)
def image(path: str, _: schemas.TokenPayload = Depends(verify_uri_token)) -> Any:
"""
读取图片
"""
if not path:
return schemas.Response(success=False)

302
app/helper/aliyun.py Normal file
View File

@ -0,0 +1,302 @@
import base64
import datetime
import json
import os
import time
import uuid
from pathlib import Path
from typing import Optional, Tuple, List
from requests import Response
from app.core.config import settings
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.system import SystemUtils
class AliyunHelper:
"""
阿里云相关操作
"""
_X_SIGNATURE = ('f4b7bed5d8524a04051bd2da876dd79afe922b8205226d65855d02b267422adb1'
'e0d8a816b021eaf5c36d101892180f79df655c5712b348c2a540ca136e6b22001')
_X_PUBLIC_KEY = ('04d9d2319e0480c840efeeb75751b86d0db0c5b9e72c6260a1d846958adceaf9d'
'ee789cab7472741d23aafc1a9c591f72e7ee77578656e6c8588098dea1488ac2a')
# 生成二维码
qrcode_url = ("https://passport.aliyundrive.com/newlogin/qrcode/generate.do?"
"appName=aliyun_drive&fromSite=52&appEntrance=web&isMobile=false"
"&lang=zh_CN&returnUrl=&bizParams=&_bx-v=2.0.31")
# 二维码登录确认
check_url = "https://passport.aliyundrive.com/newlogin/qrcode/query.do?appName=aliyun_drive&fromSite=52&_bx-v=2.0.31"
# 更新访问令牌
update_accessstoken_url = "https://auth.aliyundrive.com/v2/account/token"
# 创建会话
create_session_url = "https://api.aliyundrive.com/users/v1/users/device/create_session"
# 浏览文件
list_file_url = "https://api.aliyundrive.com/adrive/v3/file/list"
def __init__(self):
self.systemconfig = SystemConfigOper()
@staticmethod
def __log_error(res: Response, apiname: str):
"""
统一处理和打印错误信息
"""
if res is None:
logger.warn("无法连接到阿里云盘!")
return
result = res.json()
code = result.get("code")
message = result.get("message")
display_message = result.get("display_message")
logger.warn(f"Aliyun {apiname}失败:{code} - {display_message or message}")
@property
def auth_params(self):
"""
获取阿里云盘认证参数并初始化参数格式
"""
return self.systemconfig.get(SystemConfigKey.UserAliyunParams) or {}
def update_params(self, params: dict):
"""
设置阿里云盘认证参数
"""
current_params = self.auth_params
current_params.update(params)
self.systemconfig.set(SystemConfigKey.UserAliyunParams, current_params)
def clear_params(self):
"""
清除阿里云盘认证参数
"""
self.systemconfig.delete(SystemConfigKey.UserAliyunParams)
def generate_qrcode(self) -> Optional[Tuple[dict, str]]:
"""
生成二维码
"""
res = RequestUtils(timeout=10).get_res(self.qrcode_url)
if res:
data = res.json().get("content", {}).get("data")
return {
"codeContent": data.get("codeContent"),
"ck": data.get("ck"),
"t": data.get("t")
}, ""
elif res is not None:
self.__log_error(res, "生成二维码")
return {}, f"请求阿里云盘二维码失败:{res.status_code} - {res.reason}"
return {}, f"请求阿里云盘二维码失败:无法连接!"
def check_login(self, ck: str, t: str) -> Optional[Tuple[dict, str]]:
"""
二维码登录确认
"""
params = {
"t": t,
"ck": ck,
"appName": "aliyun_drive",
"appEntrance": "web",
"isMobile": "false",
"lang": "zh_CN",
"returnUrl": "",
"fromSite": "52",
"bizParams": "",
"navlanguage": "zh-CN",
"navPlatform": "MacIntel",
}
body = "&".join([f"{key}={value}" for key, value in params.items()])
status = {
"NEW": "请用阿里云盘 App 扫码",
"SCANED": "请在手机上确认",
"EXPIRED": "二维码已过期",
"CANCELED": "已取消",
"CONFIRMED": "已确认",
}
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
}
res = RequestUtils(headers=headers, timeout=5).post_res(self.check_url, data=body)
if res:
data = res.json().get("content", {}).get("data") or {}
qrCodeStatus = data.get("qrCodeStatus")
data["tip"] = status.get(qrCodeStatus) or "未知"
if data.get("bizExt"):
# base 解码为 json
try:
bizExt = json.loads(base64.b64decode(data["bizExt"]).decode('GBK'))
pds_login_result = bizExt.get("pds_login_result")
if pds_login_result:
data.pop('bizExt')
data.update({
'userId': pds_login_result.get('userId'),
'expiresIn': pds_login_result.get('expiresIn'),
'nickName': pds_login_result.get('nickName'),
'avatar': pds_login_result.get('avatar'),
'tokenType': pds_login_result.get('tokenType'),
"refreshToken": pds_login_result.get('refreshToken'),
"accessToken": pds_login_result.get('accessToken'),
"defaultDriveId": pds_login_result.get('defaultDriveId'),
"updateTime": time.time(),
})
except Exception as e:
return {}, f"bizExt 解码失败:{str(e)}"
return data, ""
elif res is not None:
self.__log_error(res, "登录确认")
return {}, f"阿里云盘登录确认失败:{res.status_code} - {res.reason}"
return {}, "阿里云盘登录确认失败:无法连接!"
def __update_accesstoken(self, refresh_token: str) -> bool:
"""
更新阿里云盘访问令牌
"""
res = RequestUtils(headers={"Content-Type": "application/json"}, timeout=10).post_res(
self.update_accessstoken_url, data={
"refresh_token": refresh_token,
"grant_type": "refresh_token"
})
if res:
data = res.json()
code = data.get("code")
if code in ["RefreshTokenExpired", "InvalidParameter.RefreshToken"]:
logger.warn("刷新令牌已过期,请重新登录!")
return False
self.update_params({
"accessToken": data.get('access_token'),
"expiresIn": data.get('expires_in'),
"updateTime": time.time()
})
logger.info(f"阿里云盘访问令牌已更新accessToken={data.get('access_token')}")
return True
else:
self.__log_error(res, "更新令牌")
return False
def create_session(self, headers: dict):
"""
创建会话
"""
def __os_name():
"""
获取操作系统名称
"""
if SystemUtils.is_windows():
return 'Windows 操作系统'
elif SystemUtils.is_macos():
return 'MacOS 操作系统'
else:
return '类 Unix 操作系统'
res = RequestUtils(headers=headers, timeout=5).post_res(self.create_session_url, json={
'deviceName': f'MoviePilot Web',
'modelName': __os_name(),
'pubKey': self._X_PUBLIC_KEY,
})
self.__log_error(res, "创建会话")
def get_access_params(self) -> Optional[dict]:
"""
获取阿里云盘访问参数如果超时则更新后返回
"""
params = self.auth_params
if not params:
logger.warn("阿里云盘访问令牌不存在,请先扫码登录!")
return None
expires_in = params.get("expiresIn")
update_time = params.get("updateTime")
refresh_token = params.get("refreshToken")
if not expires_in or not update_time or not refresh_token:
logger.warn("阿里云盘访问令牌参数错误,请重新扫码登录!")
self.clear_params()
return None
if (time.time() - update_time) >= expires_in:
logger.info("阿里云盘访问令牌已过期,正在更新...")
if not self.__update_accesstoken(refresh_token):
return None
x_device_id = params.get("x_device_id")
if not x_device_id:
x_device_id = uuid.uuid4().hex
params['x_device_id'] = x_device_id
self.update_params({"x_device_id": x_device_id})
self.create_session(self.get_headers(params))
return params
def get_headers(self, params: dict):
"""
获取请求头
"""
if not params:
return {}
return {
"Authorization": f"Bearer {params.get('accessToken')}",
"Content-Type": "application/json;charset=UTF-8",
"Accept": "application/json, text/plain, */*",
"Referer": "https://www.alipan.com/",
"User-Agent": settings.USER_AGENT,
"X-Canary": "client=web,app=adrive,version=v4.9.0",
"x-device-id": params.get('x_device_id'),
"x-signature": self._X_SIGNATURE
}
def list_files(self, parent_file_id: str = 'root', list_type: str = None,
limit: int = 100, order_by: str = 'updated_at') -> List[dict]:
"""
浏览文件
limit 返回文件数量默认 50最大 100
order_by created_at/updated_at/name/size
parent_file_id 根目录为root
type all | file | folder
"""
params = self.get_access_params()
if not params:
return []
# 最终返回数据
ret_items = []
# 请求头
headers = self.get_headers(params)
# 分页获取
next_marker = None
while True:
if not parent_file_id or parent_file_id == "/":
parent_file_id = "root"
res = RequestUtils(headers=headers, timeout=10).post_res(self.list_file_url, json={
"drive_id": params.get("defaultDriveId"),
"type": list_type,
"limit": limit,
"order_by": order_by,
"parent_file_id": parent_file_id,
"marker": next_marker
}, params={
'jsonmask': ('next_marker,items(name,file_id,drive_id,type,size,created_at,updated_at,'
'category,file_extension,parent_file_id,mime_type,starred,thumbnail,url,'
'streams_info,content_hash,user_tags,user_meta,trashed,video_media_metadata,'
'video_preview_metadata,sync_meta,sync_device_flag,sync_flag,punish_flag')
})
if res:
result = res.json()
items = result.get("items")
if not items:
break
# 合并数据
ret_items.extend(items)
next_marker = result.get("next_marker")
if not next_marker:
# 没有下一页
break
else:
self.__log_error(res, "浏览文件")
break
return ret_items

View File

@ -20,3 +20,7 @@ class FileItem(BaseModel):
modify_time: Optional[float] = None
# 子节点
children: Optional[list] = []
# ID
fileid: Optional[str] = None
# 父ID
parent_fileid: Optional[str] = None

View File

@ -94,6 +94,8 @@ class SystemConfigKey(Enum):
DownloadDirectories = "DownloadDirectories"
# 媒体库目录定义
LibraryDirectories = "LibraryDirectories"
# 阿里云盘认证参数
UserAliyunParams = "UserAliyunParams"
# 处理进度Key字典

View File

@ -383,6 +383,21 @@ class StringUtils:
print(str(e))
return timestamp
@staticmethod
def str_to_timestamp(date_str: str) -> float:
"""
日期转时间戳
:param date_str:
:return:
"""
if not date_str:
return 0
try:
return dateparser.parse(date_str).timestamp()
except Exception as e:
print(str(e))
return 0
@staticmethod
def to_bool(text: str, default_val: bool = False) -> bool:
"""