try fix cookiecloud

This commit is contained in:
jxxghp 2024-03-17 12:57:38 +08:00
parent a05724f664
commit cbd704373c
3 changed files with 44 additions and 36 deletions

View File

@ -1,7 +1,8 @@
import gzip import gzip
import json import json
import os from hashlib import md5
from typing import Annotated, Any, Callable, Dict from typing import Annotated, Callable
from typing import Any, Dict, Optional
from fastapi import APIRouter, Depends, HTTPException, Path, Request, Response from fastapi import APIRouter, Depends, HTTPException, Path, Request, Response
from fastapi.responses import PlainTextResponse from fastapi.responses import PlainTextResponse
@ -9,11 +10,11 @@ from fastapi.routing import APIRoute
from app import schemas from app import schemas
from app.core.config import settings from app.core.config import settings
from app.utils.common import get_decrypted_cookie_data from app.log import logger
from app.utils.common import decrypt
class GzipRequest(Request): class GzipRequest(Request):
_body: bytes = b"" _body: bytes = b""
async def body(self) -> bytes: async def body(self) -> bytes:
@ -53,18 +54,20 @@ cookie_router = APIRouter(route_class=GzipRoute,
@cookie_router.get("/", response_class=PlainTextResponse) @cookie_router.get("/", response_class=PlainTextResponse)
def get_root(): def get_root():
return "Hello World! API ROOT = /cookiecloud" return "Hello MoviePilot! COOKIECLOUD API ROOT = /cookiecloud"
@cookie_router.post("/", response_class=PlainTextResponse) @cookie_router.post("/", response_class=PlainTextResponse)
def post_root(): def post_root():
return "Hello World! API ROOT = /cookiecloud" return "Hello MoviePilot! COOKIECLOUD API ROOT = /cookiecloud"
@cookie_router.post("/update") @cookie_router.post("/update")
async def update_cookie(req: schemas.CookieData): async def update_cookie(req: schemas.CookieData):
file_path = os.path.join(settings.COOKIE_PATH, """
os.path.basename(req.uuid) + ".json") 上传Cookie数据
"""
file_path = settings.COOKIE_PATH / f"{req.uuid}.json"
content = json.dumps({"encrypted": req.encrypted}) content = json.dumps({"encrypted": req.encrypted})
with open(file_path, encoding="utf-8", mode="w") as file: with open(file_path, encoding="utf-8", mode="w") as file:
file.write(content) file.write(content)
@ -77,23 +80,50 @@ async def update_cookie(req: schemas.CookieData):
def load_encrypt_data(uuid: str) -> Dict[str, Any]: def load_encrypt_data(uuid: str) -> Dict[str, Any]:
file_path = os.path.join(settings.COOKIE_PATH, """
os.path.basename(uuid) + ".json") 加载本地加密原始数据
"""
file_path = settings.COOKIE_PATH / f"{uuid}.json"
# 检查文件是否存在 # 检查文件是否存在
if not os.path.exists(file_path): if not file_path.exists():
raise HTTPException(status_code=404, detail="Item not found") raise HTTPException(status_code=404, detail="Item not found")
# 读取文件 # 读取文件
with open(file_path, encoding="utf-8", mode="r") as file: with open(file_path, encoding="utf-8", mode="r") as file:
read_content = file.read() read_content = file.read()
data = json.loads(read_content) data = json.loads(read_content.encode("utf-8"))
return data return data
def get_decrypted_cookie_data(uuid: str, password: str,
encrypted: str) -> Optional[Dict[str, Any]]:
"""
加载本地加密数据并解密为Cookie
"""
key_md5 = md5()
key_md5.update((uuid + '-' + password).encode('utf-8'))
aes_key = (key_md5.hexdigest()[:16]).encode('utf-8')
if encrypted:
try:
decrypted_data = decrypt(encrypted, aes_key).decode('utf-8')
decrypted_data = json.loads(decrypted_data)
if 'cookie_data' in decrypted_data:
return decrypted_data
except Exception as e:
logger.error(f"解密Cookie数据失败{str(e)}")
return None
else:
return None
@cookie_router.get("/get/{uuid}") @cookie_router.get("/get/{uuid}")
async def get_cookie( async def get_cookie(
uuid: Annotated[str, Path(min_length=5, pattern="^[a-zA-Z0-9]+$")]): uuid: Annotated[str, Path(min_length=5, pattern="^[a-zA-Z0-9]+$")]):
"""
下载加密数据
"""
return load_encrypt_data(uuid) return load_encrypt_data(uuid)

View File

@ -1,12 +1,10 @@
from typing import Union
from fastapi import Query from fastapi import Query
from pydantic import BaseModel from pydantic import BaseModel
class CookieData(BaseModel): class CookieData(BaseModel):
uuid: str = Query(min_length=5, pattern="^[a-zA-Z0-9]+$")
encrypted: str = Query(min_length=1, max_length=1024 * 1024 * 50) encrypted: str = Query(min_length=1, max_length=1024 * 1024 * 50)
uuid: str = Query(min_length=5, pattern="^[a-zA-Z0-9]+$")
class CookiePassword(BaseModel): class CookiePassword(BaseModel):

View File

@ -1,9 +1,7 @@
import base64 import base64
import json
import time import time
from hashlib import md5 from hashlib import md5
from typing import Any, Dict, Optional from typing import Any
from Crypto import Random from Crypto import Random
from Crypto.Cipher import AES from Crypto.Cipher import AES
@ -84,21 +82,3 @@ def decrypt(encrypted: str | bytes, passphrase: bytes) -> bytes:
aes = AES.new(key, AES.MODE_CBC, iv) aes = AES.new(key, AES.MODE_CBC, iv)
data = aes.decrypt(encrypted[16:]) data = aes.decrypt(encrypted[16:])
return data[:-(data[-1] if type(data[-1]) == int else ord(data[-1]))] return data[:-(data[-1] if type(data[-1]) == int else ord(data[-1]))]
def get_decrypted_cookie_data(uuid: str, password: str,
encrypted: str) -> Optional[Dict[str, Any]]:
key_md5 = md5()
key_md5.update((uuid + '-' + password).encode('utf-8'))
aes_key = (key_md5.hexdigest()[:16]).encode('utf-8')
if encrypted is not None:
try:
decrypted_data = decrypt(encrypted, aes_key).decode('utf-8')
decrypted_data = json.loads(decrypted_data)
if 'cookie_data' in decrypted_data:
return decrypted_data
except Exception as e:
return None
else:
return None