This commit is contained in:
jxxghp 2024-03-16 20:40:02 +08:00
parent d917b00055
commit e4b90ca8f7
2 changed files with 20 additions and 26 deletions

View File

@ -40,9 +40,7 @@ class SiteChain(ChainBase):
self.rsshelper = RssHelper() self.rsshelper = RssHelper()
self.cookiehelper = CookieHelper() self.cookiehelper = CookieHelper()
self.message = MessageHelper() self.message = MessageHelper()
self.cookiecloud = CookieCloudHelper( self.cookiecloud = CookieCloudHelper()
settings=settings
)
# 特殊站点登录验证 # 特殊站点登录验证
self.special_site_test = { self.special_site_test = {

View File

@ -1,31 +1,27 @@
import os
import json import json
import os
from typing import Any, Dict, Tuple, Optional
from hashlib import md5 from hashlib import md5
from typing import Any, Dict, Tuple, Optional
from app.core.config import Settings, settings from app.core.config import settings
from app.utils.common import decrypt
from app.utils.http import RequestUtils from app.utils.http import RequestUtils
from app.utils.string import StringUtils from app.utils.string import StringUtils
from app.utils.common import decrypt
class CookieCloudHelper: class CookieCloudHelper:
_ignore_cookies: list = ["CookieAutoDeleteBrowsingDataCleanup", "CookieAutoDeleteCleaningDiscarded"] _ignore_cookies: list = ["CookieAutoDeleteBrowsingDataCleanup", "CookieAutoDeleteCleaningDiscarded"]
def __init__(self, settings: Settings): def __init__(self):
self._setting = settings
self._sync_setting() self._sync_setting()
self._req = RequestUtils(content_type="application/json") self._req = RequestUtils(content_type="application/json")
def _sync_setting(self): def _sync_setting(self):
if self._setting: self._server = settings.COOKIECLOUD_HOST
self._server = settings.COOKIECLOUD_HOST self._key = settings.COOKIECLOUD_KEY
self._key = settings.COOKIECLOUD_KEY self._password = settings.COOKIECLOUD_PASSWORD
self._password = settings.COOKIECLOUD_PASSWORD self._enable_local = settings.COOKIECLOUD_ENABLE_LOCAL
self._enable_local = settings.COOKIECLOUD_ENABLE_LOCAL self._local_path = settings.COOKIE_PATH
self._local_path = settings.COOKIE_PATH
def download(self) -> Tuple[Optional[dict], str]: def download(self) -> Tuple[Optional[dict], str]:
""" """
@ -35,11 +31,11 @@ class CookieCloudHelper:
# 更新为最新设置 # 更新为最新设置
self._sync_setting() self._sync_setting()
if (not self._server and if ((not self._server and not self._enable_local)
not self._enable_local) or not self._key or not self._password: or not self._key
or not self._password):
return None, "CookieCloud参数不正确" return None, "CookieCloud参数不正确"
result = None
if self._enable_local: if self._enable_local:
# 开启本地服务时,从本地直接读取数据 # 开启本地服务时,从本地直接读取数据
result = self._load_local_encrypt_data(self._key) result = self._load_local_encrypt_data(self._key)
@ -51,7 +47,7 @@ class CookieCloudHelper:
if ret and ret.status_code == 200: if ret and ret.status_code == 200:
result = ret.json() result = ret.json()
if not result: if not result:
return {},f"未从{self._server}下载到cookie数据" return {}, f"未从{self._server}下载到cookie数据"
elif ret: elif ret:
return None, f"远程同步CookieCloud失败错误码{ret.status_code}" return None, f"远程同步CookieCloud失败错误码{ret.status_code}"
else: else:
@ -101,8 +97,8 @@ class CookieCloudHelper:
# 站点Cookie # 站点Cookie
cookie_str = ";".join( cookie_str = ";".join(
[f"{content.get('name')}={content.get('value')}" [f"{content.get('name')}={content.get('value')}"
for content in content_list for content in content_list
if content.get("name") and content.get("name") not in self._ignore_cookies] if content.get("name") and content.get("name") not in self._ignore_cookies]
) )
ret_cookies[domain] = cookie_str ret_cookies[domain] = cookie_str
return ret_cookies, "" return ret_cookies, ""
@ -115,7 +111,7 @@ class CookieCloudHelper:
md5_generator.update((str(self._key).strip() + '-' + str(self._password).strip()).encode('utf-8')) md5_generator.update((str(self._key).strip() + '-' + str(self._password).strip()).encode('utf-8'))
return (md5_generator.hexdigest()[:16]).encode('utf-8') return (md5_generator.hexdigest()[:16]).encode('utf-8')
def _load_local_encrypt_data(self,uuid: str) -> Dict[str, Any]: def _load_local_encrypt_data(self, uuid: str) -> Dict[str, Any]:
file_path = os.path.join(self._local_path, os.path.basename(uuid) + ".json") file_path = os.path.join(self._local_path, os.path.basename(uuid) + ".json")
# 检查文件是否存在 # 检查文件是否存在
if not os.path.exists(file_path): if not os.path.exists(file_path):