Merge pull request #1694 from lingjiameng/main

CookieCloud配置支持实时更新
This commit is contained in:
jxxghp 2024-03-16 20:36:05 +08:00 committed by GitHub
commit d917b00055
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 20 additions and 15 deletions

View File

@ -41,11 +41,7 @@ class SiteChain(ChainBase):
self.cookiehelper = CookieHelper() self.cookiehelper = CookieHelper()
self.message = MessageHelper() self.message = MessageHelper()
self.cookiecloud = CookieCloudHelper( self.cookiecloud = CookieCloudHelper(
server=settings.COOKIECLOUD_HOST, settings=settings
key=settings.COOKIECLOUD_KEY,
password=settings.COOKIECLOUD_PASSWORD,
enable_local=settings.COOKIECLOUD_ENABLE_LOCAL,
local_path=settings.COOKIE_PATH
) )
# 特殊站点登录验证 # 特殊站点登录验证

View File

@ -4,6 +4,7 @@ import json
from typing import Any, Dict, Tuple, Optional from typing import Any, Dict, Tuple, Optional
from hashlib import md5 from hashlib import md5
from app.core.config import Settings, settings
from app.utils.http import RequestUtils from app.utils.http import RequestUtils
from app.utils.string import StringUtils from app.utils.string import StringUtils
from app.utils.common import decrypt from app.utils.common import decrypt
@ -13,19 +14,27 @@ class CookieCloudHelper:
_ignore_cookies: list = ["CookieAutoDeleteBrowsingDataCleanup", "CookieAutoDeleteCleaningDiscarded"] _ignore_cookies: list = ["CookieAutoDeleteBrowsingDataCleanup", "CookieAutoDeleteCleaningDiscarded"]
def __init__(self, server: str, key: str, password: str, enable_local: bool, local_path: str): def __init__(self, settings: Settings):
self._server = server self._setting = settings
self._key = key self._sync_setting()
self._password = password
self._enable_local = enable_local
self._local_path = local_path
self._req = RequestUtils(content_type="application/json") self._req = RequestUtils(content_type="application/json")
def _sync_setting(self):
if self._setting:
self._server = settings.COOKIECLOUD_HOST
self._key = settings.COOKIECLOUD_KEY
self._password = settings.COOKIECLOUD_PASSWORD
self._enable_local = settings.COOKIECLOUD_ENABLE_LOCAL
self._local_path = settings.COOKIE_PATH
def download(self) -> Tuple[Optional[dict], str]: def download(self) -> Tuple[Optional[dict], str]:
""" """
从CookieCloud下载数据 从CookieCloud下载数据
:return: Cookie数据错误信息 :return: Cookie数据错误信息
""" """
# 更新为最新设置
self._sync_setting()
if (not self._server and if (not self._server and
not self._enable_local) or not self._key or not self._password: not self._enable_local) or not self._key or not self._password:
return None, "CookieCloud参数不正确" return None, "CookieCloud参数不正确"
@ -33,7 +42,7 @@ class CookieCloudHelper:
result = None result = None
if self._enable_local: if self._enable_local:
# 开启本地服务时,从本地直接读取数据 # 开启本地服务时,从本地直接读取数据
result = self.load_local_encrypt_data(self._key) result = self._load_local_encrypt_data(self._key)
if not result: if not result:
return {}, "未从本地CookieCloud服务加载到cookie数据请检查服务器设置、用户KEY及加密密码是否正确" return {}, "未从本地CookieCloud服务加载到cookie数据请检查服务器设置、用户KEY及加密密码是否正确"
else: else:
@ -52,7 +61,7 @@ class CookieCloudHelper:
if not encrypted: if not encrypted:
return {}, "未获取到cookie密文" return {}, "未获取到cookie密文"
else: else:
crypt_key = self.get_crypt_key() crypt_key = self._get_crypt_key()
try: try:
decrypted_data = decrypt(encrypted, crypt_key).decode('utf-8') decrypted_data = decrypt(encrypted, crypt_key).decode('utf-8')
result = json.loads(decrypted_data) result = json.loads(decrypted_data)
@ -98,7 +107,7 @@ class CookieCloudHelper:
ret_cookies[domain] = cookie_str ret_cookies[domain] = cookie_str
return ret_cookies, "" return ret_cookies, ""
def get_crypt_key(self) -> bytes: def _get_crypt_key(self) -> bytes:
""" """
使用UUID和密码生成CookieCloud的加解密密钥 使用UUID和密码生成CookieCloud的加解密密钥
""" """
@ -106,7 +115,7 @@ class CookieCloudHelper:
md5_generator.update((str(self._key).strip() + '-' + str(self._password).strip()).encode('utf-8')) md5_generator.update((str(self._key).strip() + '-' + str(self._password).strip()).encode('utf-8'))
return (md5_generator.hexdigest()[:16]).encode('utf-8') return (md5_generator.hexdigest()[:16]).encode('utf-8')
def load_local_encrypt_data(self,uuid: str) -> Dict[str, Any]: def _load_local_encrypt_data(self,uuid: str) -> Dict[str, Any]:
file_path = os.path.join(self._local_path, os.path.basename(uuid) + ".json") file_path = os.path.join(self._local_path, os.path.basename(uuid) + ".json")
# 检查文件是否存在 # 检查文件是否存在
if not os.path.exists(file_path): if not os.path.exists(file_path):