From cbd704373cbcb42ca526e9c171fa23eb4ad6e30d Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sun, 17 Mar 2024 12:57:38 +0800 Subject: [PATCH] try fix cookiecloud --- app/api/servcookie.py | 54 ++++++++++++++++++++++++++++++--------- app/schemas/servcookie.py | 4 +-- app/utils/common.py | 22 +--------------- 3 files changed, 44 insertions(+), 36 deletions(-) diff --git a/app/api/servcookie.py b/app/api/servcookie.py index 32c4648c..c781111f 100644 --- a/app/api/servcookie.py +++ b/app/api/servcookie.py @@ -1,7 +1,8 @@ import gzip import json -import os -from typing import Annotated, Any, Callable, Dict +from hashlib import md5 +from typing import Annotated, Callable +from typing import Any, Dict, Optional from fastapi import APIRouter, Depends, HTTPException, Path, Request, Response from fastapi.responses import PlainTextResponse @@ -9,11 +10,11 @@ from fastapi.routing import APIRoute from app import schemas from app.core.config import settings -from app.utils.common import get_decrypted_cookie_data +from app.log import logger +from app.utils.common import decrypt class GzipRequest(Request): - _body: bytes = b"" async def body(self) -> bytes: @@ -53,18 +54,20 @@ cookie_router = APIRouter(route_class=GzipRoute, @cookie_router.get("/", response_class=PlainTextResponse) def get_root(): - return "Hello World! API ROOT = /cookiecloud" + return "Hello MoviePilot! COOKIECLOUD API ROOT = /cookiecloud" @cookie_router.post("/", response_class=PlainTextResponse) def post_root(): - return "Hello World! API ROOT = /cookiecloud" + return "Hello MoviePilot! COOKIECLOUD API ROOT = /cookiecloud" @cookie_router.post("/update") async def update_cookie(req: schemas.CookieData): - file_path = os.path.join(settings.COOKIE_PATH, - os.path.basename(req.uuid) + ".json") + """ + 上传Cookie数据 + """ + file_path = settings.COOKIE_PATH / f"{req.uuid}.json" content = json.dumps({"encrypted": req.encrypted}) with open(file_path, encoding="utf-8", mode="w") as file: file.write(content) @@ -77,23 +80,50 @@ async def update_cookie(req: schemas.CookieData): def load_encrypt_data(uuid: str) -> Dict[str, Any]: - file_path = os.path.join(settings.COOKIE_PATH, - os.path.basename(uuid) + ".json") + """ + 加载本地加密原始数据 + """ + file_path = settings.COOKIE_PATH / f"{uuid}.json" # 检查文件是否存在 - if not os.path.exists(file_path): + if not file_path.exists(): raise HTTPException(status_code=404, detail="Item not found") # 读取文件 with open(file_path, encoding="utf-8", mode="r") as file: read_content = file.read() - data = json.loads(read_content) + data = json.loads(read_content.encode("utf-8")) return data +def get_decrypted_cookie_data(uuid: str, password: str, + encrypted: str) -> Optional[Dict[str, Any]]: + """ + 加载本地加密数据并解密为Cookie + """ + key_md5 = md5() + key_md5.update((uuid + '-' + password).encode('utf-8')) + aes_key = (key_md5.hexdigest()[:16]).encode('utf-8') + + if encrypted: + try: + decrypted_data = decrypt(encrypted, aes_key).decode('utf-8') + decrypted_data = json.loads(decrypted_data) + if 'cookie_data' in decrypted_data: + return decrypted_data + except Exception as e: + logger.error(f"解密Cookie数据失败:{str(e)}") + return None + else: + return None + + @cookie_router.get("/get/{uuid}") async def get_cookie( uuid: Annotated[str, Path(min_length=5, pattern="^[a-zA-Z0-9]+$")]): + """ + 下载加密数据 + """ return load_encrypt_data(uuid) diff --git a/app/schemas/servcookie.py b/app/schemas/servcookie.py index 7de4e481..13e1239a 100644 --- a/app/schemas/servcookie.py +++ b/app/schemas/servcookie.py @@ -1,12 +1,10 @@ -from typing import Union - from fastapi import Query from pydantic import BaseModel class CookieData(BaseModel): - uuid: str = Query(min_length=5, pattern="^[a-zA-Z0-9]+$") encrypted: str = Query(min_length=1, max_length=1024 * 1024 * 50) + uuid: str = Query(min_length=5, pattern="^[a-zA-Z0-9]+$") class CookiePassword(BaseModel): diff --git a/app/utils/common.py b/app/utils/common.py index beb21b65..5708fbfd 100644 --- a/app/utils/common.py +++ b/app/utils/common.py @@ -1,9 +1,7 @@ import base64 -import json import time - from hashlib import md5 -from typing import Any, Dict, Optional +from typing import Any from Crypto import Random from Crypto.Cipher import AES @@ -84,21 +82,3 @@ def decrypt(encrypted: str | bytes, passphrase: bytes) -> bytes: aes = AES.new(key, AES.MODE_CBC, iv) data = aes.decrypt(encrypted[16:]) return data[:-(data[-1] if type(data[-1]) == int else ord(data[-1]))] - - -def get_decrypted_cookie_data(uuid: str, password: str, - encrypted: str) -> Optional[Dict[str, Any]]: - key_md5 = md5() - key_md5.update((uuid + '-' + password).encode('utf-8')) - aes_key = (key_md5.hexdigest()[:16]).encode('utf-8') - - if encrypted is not None: - try: - decrypted_data = decrypt(encrypted, aes_key).decode('utf-8') - decrypted_data = json.loads(decrypted_data) - if 'cookie_data' in decrypted_data: - return decrypted_data - except Exception as e: - return None - else: - return None