import os from datetime import datetime, timedelta from pathlib import Path from typing import List, Tuple, Dict, Any import pytz from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from python_hosts import Hosts, HostsEntry from app.core.config import settings from app.log import logger from app.plugins import _PluginBase from app.schemas.types import EventType, NotificationType from app.utils.http import RequestUtils from app.utils.ip import IpUtils from app.utils.system import SystemUtils class CloudflareSpeedTest(_PluginBase): # 插件名称 plugin_name = "Cloudflare IP优选" # 插件描述 plugin_desc = "🌩 测试 Cloudflare CDN 延迟和速度,自动优选IP。" # 插件图标 plugin_icon = "cloudflare.jpg" # 主题色 plugin_color = "#F6821F" # 插件版本 plugin_version = "1.0" # 插件作者 plugin_author = "thsrite" # 作者主页 author_url = "https://github.com/thsrite" # 插件配置项ID前缀 plugin_config_prefix = "cloudflarespeedtest_" # 加载顺序 plugin_order = 12 # 可使用的用户级别 auth_level = 1 # 私有属性 _customhosts = False _cf_ip = None _scheduler = None _cron = None _onlyonce = False _ipv4 = False _ipv6 = False _version = None _additional_args = None _re_install = False _notify = False _check = False _cf_path = None _cf_ipv4 = None _cf_ipv6 = None _result_file = None _release_prefix = 'https://github.com/XIU2/CloudflareSpeedTest/releases/download' _binary_name = 'CloudflareST' def init_plugin(self, config: dict = None): # 停止现有任务 self.stop_service() # 读取配置 if config: self._onlyonce = config.get("onlyonce") self._cron = config.get("cron") self._cf_ip = config.get("cf_ip") self._version = config.get("version") self._ipv4 = config.get("ipv4") self._ipv6 = config.get("ipv6") self._re_install = config.get("re_install") self._additional_args = config.get("additional_args") self._notify = config.get("notify") self._check = config.get("check") if self.get_state() or self._onlyonce: self._scheduler = BackgroundScheduler(timezone=settings.TZ) if self._cron: logger.info(f"Cloudflare CDN优选服务启动,周期:{self._cron}") self._scheduler.add_job(func=self.__cloudflareSpeedTest, trigger=CronTrigger.from_crontab(self._cron), name="Cloudflare优选") if self._onlyonce: logger.info(f"Cloudflare CDN优选服务启动,立即运行一次") self._scheduler.add_job(func=self.__cloudflareSpeedTest, trigger='date', run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3), name="Cloudflare优选") # 关闭一次性开关 self._onlyonce = False self.__update_config() # 启动任务 if self._scheduler.get_jobs(): self._scheduler.print_jobs() self._scheduler.start() def __cloudflareSpeedTest(self): """ CloudflareSpeedTest优选 """ self._cf_path = self.get_data_path() self._cf_ipv4 = os.path.join(self._cf_path, "ip.txt") self._cf_ipv6 = os.path.join(self._cf_path, "ipv6.txt") self._result_file = os.path.join(self._cf_path, "result_hosts.txt") # 获取自定义Hosts插件,若无设置则停止 customHosts = self.get_config("CustomHosts") self._customhosts = customHosts and customHosts.get("enabled") if self._cf_ip and not customHosts or not customHosts.get("hosts"): logger.error(f"Cloudflare CDN优选依赖于自定义Hosts,请先维护hosts") return if not self._cf_ip: logger.error("CloudflareSpeedTest加载成功,首次运行,需要配置优选ip") return # ipv4和ipv6必须其一 if not self._ipv4 and not self._ipv6: self._ipv4 = True self.__update_config() logger.warn(f"Cloudflare CDN优选未指定ip类型,默认ipv4") err_flag, release_version = self.__check_envirment() if err_flag and release_version: # 更新版本 self._version = release_version self.__update_config() hosts = customHosts.get("hosts") if isinstance(hosts, str): hosts = str(hosts).split('\n') # 校正优选ip if self._check: self.__check_cf_ip(hosts=hosts) # 开始优选 if err_flag: logger.info("正在进行CLoudflare CDN优选,请耐心等待") # 执行优选命令,-dd不测速 cf_command = f'cd {self._cf_path} && chmod a+x {self._binary_name} && ./{self._binary_name} {self._additional_args} -o {self._result_file}' + ( f' -f {self._cf_ipv4}' if self._ipv4 else '') + (f' -f {self._cf_ipv6}' if self._ipv6 else '') logger.info(f'正在执行优选命令 {cf_command}') os.system(cf_command) # 获取优选后最优ip best_ip = SystemUtils.execute("sed -n '2,1p' " + self._result_file + " | awk -F, '{print $1}'") logger.info(f"\n获取到最优ip==>[{best_ip}]") # 替换自定义Hosts插件数据库hosts if IpUtils.is_ipv4(best_ip) or IpUtils.is_ipv6(best_ip): if best_ip == self._cf_ip: logger.info(f"CloudflareSpeedTest CDN优选ip未变,不做处理") else: # 替换优选ip err_hosts = customHosts.get("err_hosts") # 处理ip new_hosts = [] for host in hosts: if host and host != '\n': host_arr = str(host).split() if host_arr[0] == self._cf_ip: new_hosts.append(host.replace(self._cf_ip, best_ip).replace("\n", "") + "\n") else: new_hosts.append(host.replace("\n", "") + "\n") # 更新自定义Hosts self.update_config( { "hosts": ''.join(new_hosts), "err_hosts": err_hosts, "enabled": True }, "CustomHosts" ) # 更新优选ip old_ip = self._cf_ip self._cf_ip = best_ip self.__update_config() logger.info(f"Cloudflare CDN优选ip [{best_ip}] 已替换自定义Hosts插件") # 解发自定义hosts插件重载 logger.info("通知CustomHosts插件重载 ...") self.eventmanager.send_event(EventType.PluginReload, { "plugin_id": "CustomHosts" }) if self._notify: self.post_message( mtype=NotificationType.SiteMessage, title="【Cloudflare优选任务完成】", text=f"原ip:{old_ip}\n" f"新ip:{best_ip}" ) else: logger.error("获取到最优ip格式错误,请重试") self._onlyonce = False self.__update_config() self.stop_service() def __check_cf_ip(self, hosts): """ 校正cf优选ip 防止特殊情况下cf优选ip和自定义hosts插件中ip不一致 """ # 统计每个IP地址出现的次数 ip_count = {} for host in hosts: if host: ip = host.split()[0] if ip in ip_count: ip_count[ip] += 1 else: ip_count[ip] = 1 # 找出出现次数最多的IP地址 max_ips = [] # 保存最多出现的IP地址 max_count = 0 for ip, count in ip_count.items(): if count > max_count: max_ips = [ip] # 更新最多的IP地址 max_count = count elif count == max_count: max_ips.append(ip) # 如果出现次数最多的ip不止一个,则不做兼容处理 if len(max_ips) != 1: return if max_ips[0] != self._cf_ip: self._cf_ip = max_ips[0] logger.info(f"获取到自定义hosts插件中ip {max_ips[0]} 出现次数最多,已自动校正优选ip") def __check_envirment(self): """ 环境检查 """ # 是否安装标识 install_flag = False # 是否重新安装 if self._re_install: install_flag = True os.system(f'rm -rf {self._cf_path}') logger.info(f'删除CloudflareSpeedTest目录 {self._cf_path},开始重新安装') # 判断目录是否存在 cf_path = Path(self._cf_path) if not cf_path.exists(): os.mkdir(self._cf_path) # 获取CloudflareSpeedTest最新版本 release_version = self.__get_release_version() if not release_version: # 如果升级失败但是有可执行文件CloudflareST,则可继续运行,反之停止 if Path(f'{self._cf_path}/{self._binary_name}').exists(): logger.warn(f"获取CloudflareSpeedTest版本失败,存在可执行版本,继续运行") return True, None elif self._version: logger.error(f"获取CloudflareSpeedTest版本失败,获取上次运行版本{self._version},开始安装") install_flag = True else: release_version = "v2.2.2" self._version = release_version logger.error(f"获取CloudflareSpeedTest版本失败,获取默认版本{release_version},开始安装") install_flag = True # 有更新 if not install_flag and release_version != self._version: logger.info(f"检测到CloudflareSpeedTest有版本[{release_version}]更新,开始安装") install_flag = True # 重装后数据库有版本数据,但是本地没有则重装 if not install_flag and release_version == self._version and not Path( f'{self._cf_path}/{self._binary_name}').exists(): logger.warn(f"未检测到CloudflareSpeedTest本地版本,重新安装") install_flag = True if not install_flag: logger.info(f"CloudflareSpeedTest无新版本,存在可执行版本,继续运行") return True, None # 检查环境、安装 if SystemUtils.is_windows(): # todo logger.error(f"CloudflareSpeedTest暂不支持windows平台") return False, None elif SystemUtils.is_macos(): # mac uname = SystemUtils.execute('uname -m') arch = 'amd64' if uname == 'x86_64' else 'arm64' cf_file_name = f'CloudflareST_darwin_{arch}.zip' download_url = f'{self._release_prefix}/{release_version}/{cf_file_name}' return self.__os_install(download_url, cf_file_name, release_version, f"ditto -V -x -k --sequesterRsrc {self._cf_path}/{cf_file_name} {self._cf_path}") else: # docker uname = SystemUtils.execute('uname -m') arch = 'amd64' if uname == 'x86_64' else 'arm64' cf_file_name = f'CloudflareST_linux_{arch}.tar.gz' download_url = f'{self._release_prefix}/{release_version}/{cf_file_name}' return self.__os_install(download_url, cf_file_name, release_version, f"tar -zxf {self._cf_path}/{cf_file_name} -C {self._cf_path}") def __os_install(self, download_url, cf_file_name, release_version, unzip_command): """ macos docker安装cloudflare """ # 手动下载安装包后,无需在此下载 if not Path(f'{self._cf_path}/{cf_file_name}').exists(): # 首次下载或下载新版压缩包 proxies = settings.PROXY https_proxy = proxies.get("https") if proxies and proxies.get("https") else None if https_proxy: os.system( f'wget -P {self._cf_path} --no-check-certificate -e use_proxy=yes -e https_proxy={https_proxy} {download_url}') else: os.system(f'wget -P {self._cf_path} https://ghproxy.com/{download_url}') # 判断是否下载好安装包 if Path(f'{self._cf_path}/{cf_file_name}').exists(): try: # 解压 os.system(f'{unzip_command}') # 删除压缩包 os.system(f'rm -rf {self._cf_path}/{cf_file_name}') if Path(f'{self._cf_path}/{self._binary_name}').exists(): logger.info(f"CloudflareSpeedTest安装成功,当前版本:{release_version}") return True, release_version else: logger.error(f"CloudflareSpeedTest安装失败,请检查") os.removedirs(self._cf_path) return False, None except Exception as err: # 如果升级失败但是有可执行文件CloudflareST,则可继续运行,反之停止 if Path(f'{self._cf_path}/{self._binary_name}').exists(): logger.error(f"CloudflareSpeedTest安装失败:{str(err)},继续使用现版本运行") return True, None else: logger.error(f"CloudflareSpeedTest安装失败:{str(err)},无可用版本,停止运行") os.removedirs(self._cf_path) return False, None else: # 如果升级失败但是有可执行文件CloudflareST,则可继续运行,反之停止 if Path(f'{self._cf_path}/{self._binary_name}').exists(): logger.warn(f"CloudflareSpeedTest安装失败,存在可执行版本,继续运行") return True, None else: logger.error(f"CloudflareSpeedTest安装失败,无可用版本,停止运行") os.removedirs(self._cf_path) return False, None @staticmethod def __get_release_version(): """ 获取CloudflareSpeedTest最新版本 """ version_res = RequestUtils().get_res( "https://api.github.com/repos/XIU2/CloudflareSpeedTest/releases/latest") if not version_res: version_res = RequestUtils(proxies=settings.PROXY).get_res( "https://api.github.com/repos/XIU2/CloudflareSpeedTest/releases/latest") if version_res: ver_json = version_res.json() version = f"{ver_json['tag_name']}" return version else: return None def __update_config(self): """ 更新优选插件配置 """ self.update_config({ "onlyonce": False, "cron": self._cron, "cf_ip": self._cf_ip, "version": self._version, "ipv4": self._ipv4, "ipv6": self._ipv6, "re_install": self._re_install, "additional_args": self._additional_args, "notify": self._notify, "check": self._check }) def get_state(self) -> bool: return self._cf_ip and True if self._cron else False @staticmethod def get_command() -> List[Dict[str, Any]]: pass def get_api(self) -> List[Dict[str, Any]]: pass def get_form(self) -> Tuple[List[dict], Dict[str, Any]]: """ 拼装插件配置页面,需要返回两块数据:1、页面配置;2、数据结构 """ return [ { 'component': 'VForm', 'content': [ { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12, 'md': 4 }, 'content': [ { 'component': 'VTextField', 'props': { 'model': 'cf_ip', 'label': '优选IP', 'placeholder': '121.121.121.121' } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 4 }, 'content': [ { 'component': 'VTextField', 'props': { 'model': 'cron', 'label': '优选周期', 'placeholder': '0 0 0 ? *' } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 4 }, 'content': [ { 'component': 'VTextField', 'props': { 'model': 'version', 'readonly': True, 'label': 'CloudflareSpeedTest版本', 'placeholder': '暂未安装' } } ] } ] }, { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12, 'md': 4 }, 'content': [ { 'component': 'VSwitch', 'props': { 'model': 'ipv4', 'label': 'IPv4', } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 4 }, 'content': [ { 'component': 'VSwitch', 'props': { 'model': 'ipv6', 'label': 'IPv6', } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 4 }, 'content': [ { 'component': 'VSwitch', 'props': { 'model': 'check', 'label': '自动校准', } } ] } ] }, { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12, 'md': 4 }, 'content': [ { 'component': 'VSwitch', 'props': { 'model': 'onlyonce', 'label': '立即运行一次', } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 4 }, 'content': [ { 'component': 'VSwitch', 'props': { 'model': 're_install', 'label': '重装后运行', } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 4 }, 'content': [ { 'component': 'VSwitch', 'props': { 'model': 'notify', 'label': '运行时通知', } } ] } ] }, { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12 }, 'content': [ { 'component': 'VTextField', 'props': { 'model': 'additional_args', 'label': '高级参数', 'placeholder': '-dd' } } ] } ] } ] } ], { "cf_ip": "", "cron": "", "version": "", "ipv4": True, "ipv6": False, "check": False, "onlyonce": False, "re_install": False, "notify": True, "additional_args": "" } def get_page(self) -> List[dict]: pass @staticmethod def __read_system_hosts(): """ 读取系统hosts对象 """ # 获取本机hosts路径 if SystemUtils.is_windows(): hosts_path = r"c:\windows\system32\drivers\etc\hosts" else: hosts_path = '/etc/hosts' # 读取系统hosts return Hosts(path=hosts_path) def __add_hosts_to_system(self, hosts): """ 添加hosts到系统 """ # 系统hosts对象 system_hosts = self.__read_system_hosts() # 过滤掉插件添加的hosts orgin_entries = [] for entry in system_hosts.entries: if entry.entry_type == "comment" and entry.comment == "# CustomHostsPlugin": break orgin_entries.append(entry) system_hosts.entries = orgin_entries # 新的有效hosts new_entrys = [] # 新的错误的hosts err_hosts = [] err_flag = False for host in hosts: if not host: continue host_arr = str(host).split() try: host_entry = HostsEntry(entry_type='ipv4' if IpUtils.is_ipv4(str(host_arr[0])) else 'ipv6', address=host_arr[0], names=host_arr[1:]) new_entrys.append(host_entry) except Exception as err: err_hosts.append(host + "\n") logger.error(f"{host} 格式转换错误:{str(err)}") # 推送实时消息 self.systemmessage.put(f"{host} 格式转换错误:{str(err)}") # 写入系统hosts if new_entrys: try: # 添加分隔标识 system_hosts.add([HostsEntry(entry_type='comment', comment="# CustomHostsPlugin")]) # 添加新的Hosts system_hosts.add(new_entrys) system_hosts.write() logger.info("更新系统hosts文件成功") except Exception as err: err_flag = True logger.error(f"更新系统hosts文件失败:{str(err) or '请检查权限'}") # 推送实时消息 self.systemmessage.put(f"更新系统hosts文件失败:{str(err) or '请检查权限'}") return err_flag, err_hosts def stop_service(self): """ 退出插件 """ try: if self._scheduler: self._scheduler.remove_all_jobs() if self._scheduler.running: self._scheduler.shutdown() self._scheduler = None except Exception as e: logger.error("退出插件失败:%s" % str(e))