Merge pull request #1693 from lingjiameng/main

集成CookieCloud服务器端
This commit is contained in:
jxxghp
2024-03-16 17:52:49 +08:00
committed by GitHub
10 changed files with 245 additions and 65 deletions

View File

@ -1,6 +1,7 @@
import os
import json
from typing import Tuple, Optional
from typing import Any, Dict, Tuple, Optional
from hashlib import md5
from app.utils.http import RequestUtils
@ -12,10 +13,12 @@ class CookieCloudHelper:
_ignore_cookies: list = ["CookieAutoDeleteBrowsingDataCleanup", "CookieAutoDeleteCleaningDiscarded"]
def __init__(self, server: str, key: str, password: str):
def __init__(self, server: str, key: str, password: str, enable_local: bool, local_path: str):
self._server = server
self._key = key
self._password = password
self._enable_local = enable_local
self._local_path = local_path
self._req = RequestUtils(content_type="application/json")
def download(self) -> Tuple[Optional[dict], str]:
@ -23,68 +26,78 @@ class CookieCloudHelper:
从CookieCloud下载数据
:return: Cookie数据、错误信息
"""
if not self._server or not self._key or not self._password:
if (not self._server and
not self._enable_local) or not self._key or not self._password:
return None, "CookieCloud参数不正确"
req_url = "%s/get/%s" % (self._server, str(self._key).strip())
ret = self._req.get_res(url=req_url)
if ret and ret.status_code == 200:
result = ret.json()
if not result:
return {}, "未下载到数据"
encrypted = result.get("encrypted")
if not encrypted:
return {}, "未获取到cookie密文"
else:
crypt_key = self.get_crypt_key()
try:
decrypted_data = decrypt(encrypted, crypt_key).decode('utf-8')
result = json.loads(decrypted_data)
except Exception as e:
return {}, "cookie解密失败" + str(e)
result = None
if self._enable_local:
# 开启本地服务时,从本地直接读取数据
result = self.load_local_encrypt_data(self._key)
if not result:
return {}, "cookie解密为空"
if result.get("cookie_data"):
contents = result.get("cookie_data")
else:
contents = result
# 整理数据,使用domain域名的最后两级作为分组依据
domain_groups = {}
for site, cookies in contents.items():
for cookie in cookies:
domain_key = StringUtils.get_url_domain(cookie.get("domain"))
if not domain_groups.get(domain_key):
domain_groups[domain_key] = [cookie]
else:
domain_groups[domain_key].append(cookie)
# 返回错误
ret_cookies = {}
# 索引器
for domain, content_list in domain_groups.items():
if not content_list:
continue
# 只有cf的cookie过滤掉
cloudflare_cookie = True
for content in content_list:
if content["name"] != "cf_clearance":
cloudflare_cookie = False
break
if cloudflare_cookie:
continue
# 站点Cookie
cookie_str = ";".join(
[f"{content.get('name')}={content.get('value')}"
for content in content_list
if content.get("name") and content.get("name") not in self._ignore_cookies]
)
ret_cookies[domain] = cookie_str
return ret_cookies, ""
elif ret:
return None, f"同步CookieCloud失败错误码{ret.status_code}"
return {}, "未从本地CookieCloud服务加载到cookie数据请检查服务器设置、用户KEY及加密密码是否正确"
else:
return None, "CookieCloud请求失败请检查服务器地址、用户KEY及加密密码是否正确"
req_url = "%s/get/%s" % (self._server, str(self._key).strip())
ret = self._req.get_res(url=req_url)
if ret and ret.status_code == 200:
result = ret.json()
if not result:
return {},f"未从{self._server}下载到cookie数据"
elif ret:
return None, f"远程同步CookieCloud失败错误码{ret.status_code}"
else:
return None, "CookieCloud请求失败请检查服务器地址、用户KEY及加密密码是否正确"
encrypted = result.get("encrypted")
if not encrypted:
return {}, "未获取到cookie密文"
else:
crypt_key = self.get_crypt_key()
try:
decrypted_data = decrypt(encrypted, crypt_key).decode('utf-8')
result = json.loads(decrypted_data)
except Exception as e:
return {}, "cookie解密失败" + str(e)
if not result:
return {}, "cookie解密为空"
if result.get("cookie_data"):
contents = result.get("cookie_data")
else:
contents = result
# 整理数据,使用domain域名的最后两级作为分组依据
domain_groups = {}
for site, cookies in contents.items():
for cookie in cookies:
domain_key = StringUtils.get_url_domain(cookie.get("domain"))
if not domain_groups.get(domain_key):
domain_groups[domain_key] = [cookie]
else:
domain_groups[domain_key].append(cookie)
# 返回错误
ret_cookies = {}
# 索引器
for domain, content_list in domain_groups.items():
if not content_list:
continue
# 只有cf的cookie过滤掉
cloudflare_cookie = True
for content in content_list:
if content["name"] != "cf_clearance":
cloudflare_cookie = False
break
if cloudflare_cookie:
continue
# 站点Cookie
cookie_str = ";".join(
[f"{content.get('name')}={content.get('value')}"
for content in content_list
if content.get("name") and content.get("name") not in self._ignore_cookies]
)
ret_cookies[domain] = cookie_str
return ret_cookies, ""
def get_crypt_key(self) -> bytes:
"""
使用UUID和密码生成CookieCloud的加解密密钥
@ -92,3 +105,15 @@ class CookieCloudHelper:
md5_generator = md5()
md5_generator.update((str(self._key).strip() + '-' + str(self._password).strip()).encode('utf-8'))
return (md5_generator.hexdigest()[:16]).encode('utf-8')
def load_local_encrypt_data(self,uuid: str) -> Dict[str, Any]:
file_path = os.path.join(self._local_path, os.path.basename(uuid) + ".json")
# 检查文件是否存在
if not os.path.exists(file_path):
return None
# 读取文件
with open(file_path, encoding="utf-8", mode="r") as file:
read_content = file.read()
data = json.loads(read_content)
return data