MoviePilot/app/db/systemconfig_oper.py
2023-09-12 14:57:31 +08:00

78 lines
2.4 KiB
Python

import json
from typing import Any, Union
from app.db import DbOper, SessionFactory
from app.db.models.systemconfig import SystemConfig
from app.db.plugindata_oper import PluginDataOper
from app.schemas.types import SystemConfigKey
from app.utils.object import ObjectUtils
from app.utils.singleton import Singleton
class SystemConfigOper(DbOper, metaclass=Singleton):
# 配置对象
__SYSTEMCONF: dict = {}
def __init__(self):
"""
加载配置到内存
"""
self._db = SessionFactory()
self._syscomconfig = SystemConfig()
# 插件数据
self.plugindata = PluginDataOper(self._db)
super().__init__(self._db)
for item in self._syscomconfig.list(self._db):
if ObjectUtils.is_obj(item.value):
self.__SYSTEMCONF[item.key] = json.loads(item.value)
else:
self.__SYSTEMCONF[item.key] = item.value
def set(self, key: Union[str, SystemConfigKey], value: Any):
"""
设置系统设置
"""
if isinstance(key, SystemConfigKey):
key = key.value
# 更新内存
self.__SYSTEMCONF[key] = value
# 写入数据库
if ObjectUtils.is_obj(value):
value = json.dumps(value)
elif value is None:
value = ''
conf = self._syscomconfig.get_by_key(self._db, key)
if conf:
if value:
conf.update(self._db, {"value": value})
else:
conf.delete(self._db, conf.id)
else:
conf = SystemConfig(key=key, value=value)
conf.create(self._db)
def get(self, key: Union[str, SystemConfigKey] = None) -> Any:
"""
获取系统设置
"""
if isinstance(key, SystemConfigKey):
key = key.value
if not key:
return self.__SYSTEMCONF
return self.__SYSTEMCONF.get(key)
def delete_by_key(self, key: str = None) -> Any:
"""
删除系统设置
"""
if self.__SYSTEMCONF.get(f"plugin.{key}"):
del self.__SYSTEMCONF[f"plugin.{key}"]
# 删除系统配置
self._syscomconfig.delete_by_key(db=self._db, key=f"plugin.{key}")
# 删除插件数据
self.plugindata.del_data(plugin_id=key)
def __del__(self):
if self._db:
self._db.close()