import base64 from typing import Tuple, Optional, Union from urllib.parse import urljoin from lxml import etree from app.chain import ChainBase from app.chain.site import SiteChain from app.core.config import settings from app.db.siteicon_oper import SiteIconOper from app.db.site_oper import SiteOper from app.helper.cookiecloud import CookieCloudHelper from app.helper.message import MessageHelper from app.helper.sites import SitesHelper from app.log import logger from app.utils.http import RequestUtils class CookieCloudChain(ChainBase): """ CookieCloud处理链 """ def __init__(self): super().__init__() self.siteoper = SiteOper() self.siteiconoper = SiteIconOper() self.siteshelper = SitesHelper() self.sitechain = SiteChain() self.message = MessageHelper() self.cookiecloud = CookieCloudHelper( server=settings.COOKIECLOUD_HOST, key=settings.COOKIECLOUD_KEY, password=settings.COOKIECLOUD_PASSWORD ) def remote_sync(self, userid: Union[int, str]): """ 远程触发同步站点,发送消息 """ self.post_message(title="开始同步CookieCloud站点 ...", userid=userid) # 开始同步 success, msg = self.process() if success: self.post_message(title=f"同步站点成功,{msg}", userid=userid) else: self.post_message(title=f"同步站点失败:{msg}", userid=userid) def process(self, manual=False) -> Tuple[bool, str]: """ 通过CookieCloud同步站点Cookie """ logger.info("开始同步CookieCloud站点 ...") cookies, msg = self.cookiecloud.download() if not cookies: logger.error(f"CookieCloud同步失败:{msg}") return False, msg # 保存Cookie或新增站点 _update_count = 0 _add_count = 0 for domain, cookie in cookies.items(): # 获取站点信息 indexer = self.siteshelper.get_indexer(domain) if self.siteoper.exists(domain): # 检查站点连通性 status, msg = self.sitechain.test(domain) if status: logger.info(f"站点【{indexer.get('name')}】连通性正常,不同步CookieCloud数据") continue # 更新站点Cookie self.siteoper.update_cookie(domain=domain, cookies=cookie) _update_count += 1 elif indexer: # 新增站点 self.siteoper.add(name=indexer.get("name"), url=indexer.get("domain"), domain=domain, cookie=cookie, public=1 if indexer.get("public") else 0) _add_count += 1 # 保存站点图标 if indexer: site_icon = self.siteiconoper.get_by_domain(domain) if not site_icon or not site_icon.base64: logger.info(f"开始缓存站点 {indexer.get('name')} 图标 ...") icon_url, icon_base64 = self.__parse_favicon(url=indexer.get("domain"), cookie=cookie, ua=settings.USER_AGENT) if icon_url: self.siteiconoper.update_icon(name=indexer.get("name"), domain=domain, icon_url=icon_url, icon_base64=icon_base64) logger.info(f"缓存站点 {indexer.get('name')} 图标成功") else: logger.warn(f"缓存站点 {indexer.get('name')} 图标失败") # 处理完成 ret_msg = f"更新了{_update_count}个站点,新增了{_add_count}个站点" if manual: self.message.put(f"CookieCloud同步成功, {ret_msg}") logger.info(f"CookieCloud同步成功:{ret_msg}") return True, ret_msg @staticmethod def __parse_favicon(url: str, cookie: str, ua: str) -> Tuple[str, Optional[str]]: """ 解析站点favicon,返回base64 fav图标 :param url: 站点地址 :param cookie: Cookie :param ua: User-Agent :return: """ favicon_url = urljoin(url, "favicon.ico") res = RequestUtils(cookies=cookie, timeout=60, ua=ua).get_res(url=url) if res: html_text = res.text else: logger.error(f"获取站点页面失败:{url}") return favicon_url, None html = etree.HTML(html_text) if html: fav_link = html.xpath('//head/link[contains(@rel, "icon")]/@href') if fav_link: favicon_url = urljoin(url, fav_link[0]) res = RequestUtils(cookies=cookie, timeout=20, ua=ua).get_res(url=favicon_url) if res: return favicon_url, base64.b64encode(res.content).decode() else: logger.error(f"获取站点图标失败:{favicon_url}") return favicon_url, None