add dashboard apis

This commit is contained in:
jxxghp
2023-07-10 16:42:20 +08:00
parent 90002b6223
commit 15bb043fe8
10 changed files with 223 additions and 8 deletions

View File

@ -1,9 +1,12 @@
import datetime
import os
import platform
import re
import shutil
from pathlib import Path
from typing import List
from typing import List, Union, Tuple
import psutil
from app import schemas
class SystemUtils:
@ -39,7 +42,7 @@ class SystemUtils:
return True if platform.system() == 'Darwin' else False
@staticmethod
def copy(src: Path, dest: Path):
def copy(src: Path, dest: Path) -> Tuple[int, str]:
"""
复制
"""
@ -51,7 +54,7 @@ class SystemUtils:
return -1, str(err)
@staticmethod
def move(src: Path, dest: Path):
def move(src: Path, dest: Path) -> Tuple[int, str]:
"""
移动
"""
@ -63,7 +66,7 @@ class SystemUtils:
return -1, str(err)
@staticmethod
def link(src: Path, dest: Path):
def link(src: Path, dest: Path) -> Tuple[int, str]:
"""
硬链接
"""
@ -75,7 +78,7 @@ class SystemUtils:
return -1, str(err)
@staticmethod
def softlink(src: Path, dest: Path):
def softlink(src: Path, dest: Path) -> Tuple[int, str]:
"""
软链接
"""
@ -105,7 +108,7 @@ class SystemUtils:
return files
@staticmethod
def get_directory_size(path: Path):
def get_directory_size(path: Path) -> float:
"""
计算目录的大小
@ -125,3 +128,74 @@ class SystemUtils:
total_size += path.stat().st_size
return total_size
@staticmethod
def space_usage(dir_list: Union[Path, List[Path]]) -> Tuple[float, float]:
"""
计算多个目录的总可用空间/剩余空间单位Byte并去除重复磁盘
"""
if not dir_list:
return 0.0, 0.0
if not isinstance(dir_list, list):
dir_list = [dir_list]
# 存储不重复的磁盘
disk_set = set()
# 存储总剩余空间
total_free_space = 0.0
# 存储总空间
total_space = 0.0
for dir_path in dir_list:
if not dir_path:
continue
if not dir_path.exists():
continue
# 获取目录所在磁盘
if os.name == "nt":
disk = dir_path.drive
else:
disk = os.stat(dir_path).st_dev
# 如果磁盘未出现过,则计算其剩余空间并加入总剩余空间中
if disk not in disk_set:
disk_set.add(disk)
total_space += SystemUtils.total_space(dir_path)
total_free_space += SystemUtils.free_space(dir_path)
return total_space, total_free_space
@staticmethod
def free_space(path: Path) -> float:
"""
获取指定路径的剩余空间单位Byte
"""
if not os.path.exists(path):
return 0.0
return psutil.disk_usage(str(path)).free
@staticmethod
def total_space(path: Path) -> float:
"""
获取指定路径的总空间单位Byte
"""
if not os.path.exists(path):
return 0.0
return psutil.disk_usage(str(path)).total
@staticmethod
def processes() -> List[schemas.ProcessInfo]:
"""
获取所有进程
"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'create_time', 'memory_info', 'status']):
try:
if proc.status() != psutil.STATUS_ZOMBIE:
runtime = datetime.datetime.now() - datetime.datetime.fromtimestamp(
int(getattr(proc, 'create_time', 0)()))
mem_info = getattr(proc, 'memory_info', None)()
if mem_info is not None:
mem_mb = round(mem_info.rss / (1024 * 1024), 1)
processes.append(schemas.ProcessInfo(
pid=proc.pid, name=proc.name(), run_time=runtime.seconds, memory=mem_mb
))
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return processes