fix requests

This commit is contained in:
jxxghp
2024-04-10 22:16:10 +08:00
parent 294b4a6bf9
commit a3603f79c8
15 changed files with 567 additions and 556 deletions

View File

@ -20,11 +20,11 @@ class BangumiModule(_ModuleBase):
"""
测试模块连接性
"""
ret = RequestUtils().get_res("https://api.bgm.tv/")
if ret and ret.status_code == 200:
return True, ""
elif ret:
return False, f"无法连接Bangumi错误码{ret.status_code}"
with RequestUtils().get_res("https://api.bgm.tv/") as ret:
if ret and ret.status_code == 200:
return True, ""
elif ret:
return False, f"无法连接Bangumi错误码{ret.status_code}"
return False, "Bangumi网络连接失败"
def init_setting(self) -> Tuple[str, Union[str, bool]]:

View File

@ -30,17 +30,17 @@ class DoubanModule(_ModuleBase):
self.cache = DoubanCache()
def stop(self):
pass
self.doubanapi.close()
def test(self) -> Tuple[bool, str]:
"""
测试模块连接性
"""
ret = RequestUtils().get_res("https://movie.douban.com/")
if ret and ret.status_code == 200:
return True, ""
elif ret:
return False, f"无法连接豆瓣,错误码:{ret.status_code}"
with RequestUtils().get_res("https://movie.douban.com/") as ret:
if ret and ret.status_code == 200:
return True, ""
elif ret:
return False, f"无法连接豆瓣,错误码:{ret.status_code}"
return False, "豆瓣网络连接失败"
def init_setting(self) -> Tuple[str, Union[str, bool]]:

View File

@ -210,7 +210,7 @@ class DoubanApi(metaclass=Singleton):
},
data={
"apikey": "0ab215a8b1977939201640fa14c66bab",
},
}
)
"""
req_url = self._api_url + url
@ -481,6 +481,6 @@ class DoubanApi(metaclass=Singleton):
"""
self.__invoke.cache_clear()
def __del__(self):
def close(self):
if self._session:
self._session.close()

View File

@ -193,15 +193,15 @@ class DoubanScraper:
url = url.replace("/format/webp", "/format/jpg")
file_path.with_suffix(".jpg")
logger.info(f"正在下载{file_path.stem}图片:{url} ...")
r = RequestUtils().get_res(url=url)
if r:
if self._transfer_type in ['rclone_move', 'rclone_copy']:
self.__save_remove_file(file_path, r.content)
with RequestUtils().get_res(url=url) as r:
if r:
if self._transfer_type in ['rclone_move', 'rclone_copy']:
self.__save_remove_file(file_path, r.content)
else:
file_path.write_bytes(r.content)
logger.info(f"图片已保存:{file_path}")
else:
file_path.write_bytes(r.content)
logger.info(f"图片已保存:{file_path}")
else:
logger.info(f"{file_path.stem}图片下载失败,请检查网络连通性")
logger.info(f"{file_path.stem}图片下载失败,请检查网络连通性")
except Exception as err:
logger.error(f"{file_path.stem}图片下载失败:{str(err)}")

View File

@ -56,12 +56,12 @@ class Emby:
return []
req_url = "%semby/Library/SelectableMediaFolders?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
return res.json()
else:
logger.error(f"Library/SelectableMediaFolders 未获取到返回数据")
return []
with RequestUtils().get_res(req_url) as res:
if res:
return res.json()
else:
logger.error(f"Library/SelectableMediaFolders 未获取到返回数据")
return []
except Exception as e:
logger.error(f"连接Library/SelectableMediaFolders 出错:" + str(e))
return []
@ -74,29 +74,29 @@ class Emby:
return []
req_url = "%semby/Library/VirtualFolders/Query?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
library_items = res.json().get("Items")
librarys = []
for library_item in library_items:
library_name = library_item.get('Name')
pathInfos = library_item.get('LibraryOptions', {}).get('PathInfos')
library_paths = []
for path in pathInfos:
if path.get('NetworkPath'):
library_paths.append(path.get('NetworkPath'))
else:
library_paths.append(path.get('Path'))
with RequestUtils().get_res(req_url) as res:
if res:
library_items = res.json().get("Items")
librarys = []
for library_item in library_items:
library_name = library_item.get('Name')
pathInfos = library_item.get('LibraryOptions', {}).get('PathInfos')
library_paths = []
for path in pathInfos:
if path.get('NetworkPath'):
library_paths.append(path.get('NetworkPath'))
else:
library_paths.append(path.get('Path'))
if library_name and library_paths:
librarys.append({
'Name': library_name,
'Path': library_paths
})
return librarys
else:
logger.error(f"Library/VirtualFolders/Query 未获取到返回数据")
return []
if library_name and library_paths:
librarys.append({
'Name': library_name,
'Path': library_paths
})
return librarys
else:
logger.error(f"Library/VirtualFolders/Query 未获取到返回数据")
return []
except Exception as e:
logger.error(f"连接Library/VirtualFolders/Query 出错:" + str(e))
return []
@ -113,12 +113,12 @@ class Emby:
user = self.user
req_url = f"{self._host}emby/Users/{user}/Views?api_key={self._apikey}"
try:
res = RequestUtils().get_res(req_url)
if res:
return res.json().get("Items")
else:
logger.error(f"User/Views 未获取到返回数据")
return []
with RequestUtils().get_res(req_url) as res:
if res:
return res.json().get("Items")
else:
logger.error(f"User/Views 未获取到返回数据")
return []
except Exception as e:
logger.error(f"连接User/Views 出错:" + str(e))
return []
@ -164,20 +164,20 @@ class Emby:
return None
req_url = "%sUsers?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
users = res.json()
# 先查询是否有与当前用户名称匹配的
if user_name:
with RequestUtils().get_res(req_url) as res:
if res:
users = res.json()
# 先查询是否有与当前用户名称匹配的
if user_name:
for user in users:
if user.get("Name") == user_name:
return user.get("Id")
# 查询管理员
for user in users:
if user.get("Name") == user_name:
if user.get("Policy", {}).get("IsAdministrator"):
return user.get("Id")
# 查询管理员
for user in users:
if user.get("Policy", {}).get("IsAdministrator"):
return user.get("Id")
else:
logger.error(f"Users 未获取到返回数据")
else:
logger.error(f"Users 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users出错" + str(e))
return None
@ -227,11 +227,11 @@ class Emby:
return None
req_url = "%sSystem/Info?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
return res.json().get("Id")
else:
logger.error(f"System/Info 未获取到返回数据")
with RequestUtils().get_res(req_url) as res:
if res:
return res.json().get("Id")
else:
logger.error(f"System/Info 未获取到返回数据")
except Exception as e:
logger.error(f"连接System/Info出错" + str(e))
@ -245,12 +245,12 @@ class Emby:
return 0
req_url = "%semby/Users/Query?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
return res.json().get("TotalRecordCount")
else:
logger.error(f"Users/Query 未获取到返回数据")
return 0
with RequestUtils().get_res(req_url) as res:
if res:
return res.json().get("TotalRecordCount")
else:
logger.error(f"Users/Query 未获取到返回数据")
return 0
except Exception as e:
logger.error(f"连接Users/Query出错" + str(e))
return 0
@ -264,17 +264,17 @@ class Emby:
return schemas.Statistic()
req_url = "%semby/Items/Counts?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
result = res.json()
return schemas.Statistic(
movie_count=result.get("MovieCount") or 0,
tv_count=result.get("SeriesCount") or 0,
episode_count=result.get("EpisodeCount") or 0
)
else:
logger.error(f"Items/Counts 未获取到返回数据")
return schemas.Statistic()
with RequestUtils().get_res(req_url) as res:
if res:
result = res.json()
return schemas.Statistic(
movie_count=result.get("MovieCount") or 0,
tv_count=result.get("SeriesCount") or 0,
episode_count=result.get("EpisodeCount") or 0
)
else:
logger.error(f"Items/Counts 未获取到返回数据")
return schemas.Statistic()
except Exception as e:
logger.error(f"连接Items/Counts出错" + str(e))
return schemas.Statistic()
@ -299,14 +299,14 @@ class Emby:
"&api_key=%s") % (
self._host, name, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
res_items = res.json().get("Items")
if res_items:
for res_item in res_items:
if res_item.get('Name') == name and (
not year or str(res_item.get('ProductionYear')) == str(year)):
return res_item.get('Id')
with RequestUtils().get_res(req_url) as res:
if res:
res_items = res.json().get("Items")
if res_items:
for res_item in res_items:
if res_item.get('Name') == name and (
not year or str(res_item.get('ProductionYear')) == str(year)):
return res_item.get('Id')
except Exception as e:
logger.error(f"连接Items出错" + str(e))
return None
@ -329,36 +329,36 @@ class Emby:
"&Recursive=true&SearchTerm=%s&Limit=10&IncludeSearchTypes=false&api_key=%s" % (
self._host, title, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
res_items = res.json().get("Items")
if res_items:
ret_movies = []
for res_item in res_items:
item_tmdbid = res_item.get("ProviderIds", {}).get("Tmdb")
mediaserver_item = schemas.MediaServerItem(
server="emby",
library=res_item.get("ParentId"),
item_id=res_item.get("Id"),
item_type=res_item.get("Type"),
title=res_item.get("Name"),
original_title=res_item.get("OriginalTitle"),
year=res_item.get("ProductionYear"),
tmdbid=int(item_tmdbid) if item_tmdbid else None,
imdbid=res_item.get("ProviderIds", {}).get("Imdb"),
tvdbid=res_item.get("ProviderIds", {}).get("Tvdb"),
path=res_item.get("Path")
)
if tmdb_id and item_tmdbid:
if str(item_tmdbid) != str(tmdb_id):
continue
else:
with RequestUtils().get_res(req_url) as res:
if res:
res_items = res.json().get("Items")
if res_items:
ret_movies = []
for res_item in res_items:
item_tmdbid = res_item.get("ProviderIds", {}).get("Tmdb")
mediaserver_item = schemas.MediaServerItem(
server="emby",
library=res_item.get("ParentId"),
item_id=res_item.get("Id"),
item_type=res_item.get("Type"),
title=res_item.get("Name"),
original_title=res_item.get("OriginalTitle"),
year=res_item.get("ProductionYear"),
tmdbid=int(item_tmdbid) if item_tmdbid else None,
imdbid=res_item.get("ProviderIds", {}).get("Imdb"),
tvdbid=res_item.get("ProviderIds", {}).get("Tvdb"),
path=res_item.get("Path")
)
if tmdb_id and item_tmdbid:
if str(item_tmdbid) != str(tmdb_id):
continue
else:
ret_movies.append(mediaserver_item)
continue
if (mediaserver_item.title == title
and (not year or str(mediaserver_item.year) == str(year))):
ret_movies.append(mediaserver_item)
continue
if (mediaserver_item.title == title
and (not year or str(mediaserver_item.year) == str(year))):
ret_movies.append(mediaserver_item)
return ret_movies
return ret_movies
except Exception as e:
logger.error(f"连接Items出错" + str(e))
return None
@ -401,25 +401,25 @@ class Emby:
try:
req_url = "%semby/Shows/%s/Episodes?Season=%s&IsMissing=false&api_key=%s" % (
self._host, item_id, season, self._apikey)
res_json = RequestUtils().get_res(req_url)
if res_json:
tv_item = res_json.json()
res_items = tv_item.get("Items")
season_episodes = {}
for res_item in res_items:
season_index = res_item.get("ParentIndexNumber")
if not season_index:
continue
if season and season != season_index:
continue
episode_index = res_item.get("IndexNumber")
if not episode_index:
continue
if season_index not in season_episodes:
season_episodes[season_index] = []
season_episodes[season_index].append(episode_index)
# 返回
return item_id, season_episodes
with RequestUtils().get_res(req_url) as res_json:
if res_json:
tv_item = res_json.json()
res_items = tv_item.get("Items")
season_episodes = {}
for res_item in res_items:
season_index = res_item.get("ParentIndexNumber")
if not season_index:
continue
if season and season != season_index:
continue
episode_index = res_item.get("IndexNumber")
if not episode_index:
continue
if season_index not in season_episodes:
season_episodes[season_index] = []
season_episodes[season_index].append(episode_index)
# 返回
return item_id, season_episodes
except Exception as e:
logger.error(f"连接Shows/Id/Episodes出错" + str(e))
return None, None
@ -464,13 +464,13 @@ class Emby:
req_url = "%sItems/%s/Images/%s" % (self._playhost, item_id, image_type)
try:
res = RequestUtils().get_res(req_url)
if res and res.status_code != 404:
logger.info("影片图片链接:{}".format(res.url))
return res.url
else:
logger.error("Items/Id/Images 未获取到返回数据或无该影片{}图片".format(image_type))
return None
with RequestUtils().get_res(req_url) as res:
if res and res.status_code != 404:
logger.info("影片图片链接:{}".format(res.url))
return res.url
else:
logger.error("Items/Id/Images 未获取到返回数据或无该影片{}图片".format(image_type))
return None
except Exception as e:
logger.error(f"连接Items/Id/Images出错" + str(e))
return None
@ -483,11 +483,11 @@ class Emby:
return False
req_url = "%semby/Items/%s/Refresh?Recursive=true&api_key=%s" % (self._host, item_id, self._apikey)
try:
res = RequestUtils().post_res(req_url)
if res:
return True
else:
logger.info(f"刷新媒体库对象 {item_id} 失败无法连接Emby")
with RequestUtils().post_res(req_url) as res:
if res:
return True
else:
logger.info(f"刷新媒体库对象 {item_id} 失败无法连接Emby")
except Exception as e:
logger.error(f"连接Items/Id/Refresh出错" + str(e))
return False
@ -501,11 +501,11 @@ class Emby:
return False
req_url = "%semby/Library/Refresh?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().post_res(req_url)
if res:
return True
else:
logger.info(f"刷新媒体库失败无法连接Emby")
with RequestUtils().post_res(req_url) as res:
if res:
return True
else:
logger.info(f"刷新媒体库失败无法连接Emby")
except Exception as e:
logger.error(f"连接Library/Refresh出错" + str(e))
return False
@ -580,23 +580,23 @@ class Emby:
return None
req_url = "%semby/Users/%s/Items/%s?api_key=%s" % (self._host, self.user, itemid, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res and res.status_code == 200:
item = res.json()
tmdbid = item.get("ProviderIds", {}).get("Tmdb")
return schemas.MediaServerItem(
server="emby",
library=item.get("ParentId"),
item_id=item.get("Id"),
item_type=item.get("Type"),
title=item.get("Name"),
original_title=item.get("OriginalTitle"),
year=item.get("ProductionYear"),
tmdbid=int(tmdbid) if tmdbid else None,
imdbid=item.get("ProviderIds", {}).get("Imdb"),
tvdbid=item.get("ProviderIds", {}).get("Tvdb"),
path=item.get("Path")
)
with RequestUtils().get_res(req_url) as res:
if res and res.status_code == 200:
item = res.json()
tmdbid = item.get("ProviderIds", {}).get("Tmdb")
return schemas.MediaServerItem(
server="emby",
library=item.get("ParentId"),
item_id=item.get("Id"),
item_type=item.get("Type"),
title=item.get("Name"),
original_title=item.get("OriginalTitle"),
year=item.get("ProductionYear"),
tmdbid=int(tmdbid) if tmdbid else None,
imdbid=item.get("ProviderIds", {}).get("Imdb"),
tvdbid=item.get("ProviderIds", {}).get("Tvdb"),
path=item.get("Path")
)
except Exception as e:
logger.error(f"连接Items/Id出错" + str(e))
return None
@ -611,17 +611,17 @@ class Emby:
yield None
req_url = "%semby/Users/%s/Items?ParentId=%s&api_key=%s" % (self._host, self.user, parent, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res and res.status_code == 200:
results = res.json().get("Items") or []
for result in results:
if not result:
continue
if result.get("Type") in ["Movie", "Series"]:
yield self.get_iteminfo(result.get("Id"))
elif "Folder" in result.get("Type"):
for item in self.get_items(parent=result.get('Id')):
yield item
with RequestUtils().get_res(req_url) as res:
if res and res.status_code == 200:
results = res.json().get("Items") or []
for result in results:
if not result:
continue
if result.get("Type") in ["Movie", "Series"]:
yield self.get_iteminfo(result.get("Id"))
elif "Folder" in result.get("Type"):
for item in self.get_items(parent=result.get('Id')):
yield item
except Exception as e:
logger.error(f"连接Users/Items出错" + str(e))
yield None
@ -1033,52 +1033,52 @@ class Emby:
req_url = (f"{self._host}Users/{user}/Items/Resume?"
f"Limit=100&MediaTypes=Video&api_key={self._apikey}&Fields=ProductionYear,Path")
try:
res = RequestUtils().get_res(req_url)
if res:
result = res.json().get("Items") or []
ret_resume = []
# 用户媒体库文件夹列表(排除黑名单)
library_folders = self.get_user_library_folders()
for item in result:
if len(ret_resume) == num:
break
if item.get("Type") not in ["Movie", "Episode"]:
continue
item_path = item.get("Path")
if item_path and library_folders and not any(
str(item_path).startswith(folder) for folder in library_folders):
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.get_play_url(item.get("Id"))
if item_type == MediaType.MOVIE.value:
title = item.get("Name")
subtitle = item.get("ProductionYear")
else:
title = f'{item.get("SeriesName")}'
subtitle = f'S{item.get("ParentIndexNumber")}:{item.get("IndexNumber")} - {item.get("Name")}'
if item_type == MediaType.MOVIE.value:
if item.get("BackdropImageTags"):
image = self.__get_backdrop_url(item_id=item.get("Id"),
image_tag=item.get("BackdropImageTags")[0])
with RequestUtils().get_res(req_url) as res:
if res:
result = res.json().get("Items") or []
ret_resume = []
# 用户媒体库文件夹列表(排除黑名单)
library_folders = self.get_user_library_folders()
for item in result:
if len(ret_resume) == num:
break
if item.get("Type") not in ["Movie", "Episode"]:
continue
item_path = item.get("Path")
if item_path and library_folders and not any(
str(item_path).startswith(folder) for folder in library_folders):
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.get_play_url(item.get("Id"))
if item_type == MediaType.MOVIE.value:
title = item.get("Name")
subtitle = item.get("ProductionYear")
else:
image = self.__get_local_image_by_id(item.get("Id"))
else:
image = self.__get_backdrop_url(item_id=item.get("SeriesId"),
image_tag=item.get("SeriesPrimaryImageTag"))
if not image:
image = self.__get_local_image_by_id(item.get("SeriesId"))
ret_resume.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
title=title,
subtitle=subtitle,
type=item_type,
image=image,
link=link,
percent=item.get("UserData", {}).get("PlayedPercentage")
))
return ret_resume
else:
logger.error(f"Users/Items/Resume 未获取到返回数据")
title = f'{item.get("SeriesName")}'
subtitle = f'S{item.get("ParentIndexNumber")}:{item.get("IndexNumber")} - {item.get("Name")}'
if item_type == MediaType.MOVIE.value:
if item.get("BackdropImageTags"):
image = self.__get_backdrop_url(item_id=item.get("Id"),
image_tag=item.get("BackdropImageTags")[0])
else:
image = self.__get_local_image_by_id(item.get("Id"))
else:
image = self.__get_backdrop_url(item_id=item.get("SeriesId"),
image_tag=item.get("SeriesPrimaryImageTag"))
if not image:
image = self.__get_local_image_by_id(item.get("SeriesId"))
ret_resume.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
title=title,
subtitle=subtitle,
type=item_type,
image=image,
link=link,
percent=item.get("UserData", {}).get("PlayedPercentage")
))
return ret_resume
else:
logger.error(f"Users/Items/Resume 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/Items/Resume出错" + str(e))
return []
@ -1096,35 +1096,35 @@ class Emby:
req_url = (f"{self._host}Users/{user}/Items/Latest?"
f"Limit=100&MediaTypes=Video&api_key={self._apikey}&Fields=ProductionYear,Path")
try:
res = RequestUtils().get_res(req_url)
if res:
result = res.json() or []
ret_latest = []
# 用户媒体库文件夹列表(排除黑名单)
library_folders = self.get_user_library_folders()
for item in result:
if len(ret_latest) == num:
break
if item.get("Type") not in ["Movie", "Series"]:
continue
item_path = item.get("Path")
if item_path and library_folders and not any(
str(item_path).startswith(folder) for folder in library_folders):
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.get_play_url(item.get("Id"))
image = self.__get_local_image_by_id(item_id=item.get("Id"))
ret_latest.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
title=item.get("Name"),
subtitle=item.get("ProductionYear"),
type=item_type,
image=image,
link=link
))
return ret_latest
else:
logger.error(f"Users/Items/Latest 未获取到返回数据")
with RequestUtils().get_res(req_url) as res:
if res:
result = res.json() or []
ret_latest = []
# 用户媒体库文件夹列表(排除黑名单)
library_folders = self.get_user_library_folders()
for item in result:
if len(ret_latest) == num:
break
if item.get("Type") not in ["Movie", "Series"]:
continue
item_path = item.get("Path")
if item_path and library_folders and not any(
str(item_path).startswith(folder) for folder in library_folders):
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.get_play_url(item.get("Id"))
image = self.__get_local_image_by_id(item_id=item.get("Id"))
ret_latest.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
title=item.get("Name"),
subtitle=item.get("ProductionYear"),
type=item_type,
image=image,
link=link
))
return ret_latest
else:
logger.error(f"Users/Items/Latest 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/Items/Latest出错" + str(e))
return []

View File

@ -321,11 +321,11 @@ class FanartModule(_ModuleBase):
"""
测试模块连接性
"""
ret = RequestUtils().get_res("https://webservice.fanart.tv")
if ret and ret.status_code == 200:
return True, ""
elif ret:
return False, f"无法连接fanart错误码{ret.status_code}"
with RequestUtils().get_res("https://webservice.fanart.tv") as ret:
if ret and ret.status_code == 200:
return True, ""
elif ret:
return False, f"无法连接fanart错误码{ret.status_code}"
return False, "fanart网络连接失败"
def init_setting(self) -> Tuple[str, Union[str, bool]]:

View File

@ -52,12 +52,12 @@ class Jellyfin:
return []
req_url = "%sLibrary/SelectableMediaFolders?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
return res.json()
else:
logger.error(f"Library/SelectableMediaFolders 未获取到返回数据")
return []
with RequestUtils().get_res(req_url) as res:
if res:
return res.json()
else:
logger.error(f"Library/SelectableMediaFolders 未获取到返回数据")
return []
except Exception as e:
logger.error(f"连接Library/SelectableMediaFolders 出错:" + str(e))
return []
@ -70,29 +70,29 @@ class Jellyfin:
return []
req_url = "%sLibrary/VirtualFolders?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
library_items = res.json()
librarys = []
for library_item in library_items:
library_name = library_item.get('Name')
pathInfos = library_item.get('LibraryOptions', {}).get('PathInfos')
library_paths = []
for path in pathInfos:
if path.get('NetworkPath'):
library_paths.append(path.get('NetworkPath'))
else:
library_paths.append(path.get('Path'))
with RequestUtils().get_res(req_url) as res:
if res:
library_items = res.json()
librarys = []
for library_item in library_items:
library_name = library_item.get('Name')
pathInfos = library_item.get('LibraryOptions', {}).get('PathInfos')
library_paths = []
for path in pathInfos:
if path.get('NetworkPath'):
library_paths.append(path.get('NetworkPath'))
else:
library_paths.append(path.get('Path'))
if library_name and library_paths:
librarys.append({
'Name': library_name,
'Path': library_paths
})
return librarys
else:
logger.error(f"Library/VirtualFolders 未获取到返回数据")
return []
if library_name and library_paths:
librarys.append({
'Name': library_name,
'Path': library_paths
})
return librarys
else:
logger.error(f"Library/VirtualFolders 未获取到返回数据")
return []
except Exception as e:
logger.error(f"连接Library/VirtualFolders 出错:" + str(e))
return []
@ -109,12 +109,12 @@ class Jellyfin:
user = self.user
req_url = f"{self._host}Users/{user}/Views?api_key={self._apikey}"
try:
res = RequestUtils().get_res(req_url)
if res:
return res.json().get("Items")
else:
logger.error(f"Users/Views 未获取到返回数据")
return []
with RequestUtils().get_res(req_url) as res:
if res:
return res.json().get("Items")
else:
logger.error(f"Users/Views 未获取到返回数据")
return []
except Exception as e:
logger.error(f"连接Users/Views 出错:" + str(e))
return []
@ -163,12 +163,12 @@ class Jellyfin:
return 0
req_url = "%sUsers?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
return len(res.json())
else:
logger.error(f"Users 未获取到返回数据")
return 0
with RequestUtils().get_res(req_url) as res:
if res:
return len(res.json())
else:
logger.error(f"Users 未获取到返回数据")
return 0
except Exception as e:
logger.error(f"连接Users出错" + str(e))
return 0
@ -181,20 +181,20 @@ class Jellyfin:
return None
req_url = "%sUsers?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
users = res.json()
# 先查询是否有与当前用户名称匹配的
if user_name:
with RequestUtils().get_res(req_url) as res:
if res:
users = res.json()
# 先查询是否有与当前用户名称匹配的
if user_name:
for user in users:
if user.get("Name") == user_name:
return user.get("Id")
# 查询管理员
for user in users:
if user.get("Name") == user_name:
if user.get("Policy", {}).get("IsAdministrator"):
return user.get("Id")
# 查询管理员
for user in users:
if user.get("Policy", {}).get("IsAdministrator"):
return user.get("Id")
else:
logger.error(f"Users 未获取到返回数据")
else:
logger.error(f"Users 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users出错" + str(e))
return None
@ -244,11 +244,11 @@ class Jellyfin:
return None
req_url = "%sSystem/Info?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
return res.json().get("Id")
else:
logger.error(f"System/Info 未获取到返回数据")
with RequestUtils().get_res(req_url) as res:
if res:
return res.json().get("Id")
else:
logger.error(f"System/Info 未获取到返回数据")
except Exception as e:
logger.error(f"连接System/Info出错" + str(e))
return None
@ -262,17 +262,17 @@ class Jellyfin:
return schemas.Statistic()
req_url = "%sItems/Counts?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
result = res.json()
return schemas.Statistic(
movie_count=result.get("MovieCount") or 0,
tv_count=result.get("SeriesCount") or 0,
episode_count=result.get("EpisodeCount") or 0
)
else:
logger.error(f"Items/Counts 未获取到返回数据")
return schemas.Statistic()
with RequestUtils().get_res(req_url) as res:
if res:
result = res.json()
return schemas.Statistic(
movie_count=result.get("MovieCount") or 0,
tv_count=result.get("SeriesCount") or 0,
episode_count=result.get("EpisodeCount") or 0
)
else:
logger.error(f"Items/Counts 未获取到返回数据")
return schemas.Statistic()
except Exception as e:
logger.error(f"连接Items/Counts出错" + str(e))
return schemas.Statistic()
@ -287,14 +287,14 @@ class Jellyfin:
"api_key=%s&searchTerm=%s&IncludeItemTypes=Series&Limit=10&Recursive=true") % (
self._host, self.user, self._apikey, name)
try:
res = RequestUtils().get_res(req_url)
if res:
res_items = res.json().get("Items")
if res_items:
for res_item in res_items:
if res_item.get('Name') == name and (
not year or str(res_item.get('ProductionYear')) == str(year)):
return res_item.get('Id')
with RequestUtils().get_res(req_url) as res:
if res:
res_items = res.json().get("Items")
if res_items:
for res_item in res_items:
if res_item.get('Name') == name and (
not year or str(res_item.get('ProductionYear')) == str(year)):
return res_item.get('Id')
except Exception as e:
logger.error(f"连接Items出错" + str(e))
return None
@ -317,36 +317,36 @@ class Jellyfin:
"api_key=%s&searchTerm=%s&IncludeItemTypes=Movie&Limit=10&Recursive=true") % (
self._host, self.user, self._apikey, title)
try:
res = RequestUtils().get_res(req_url)
if res:
res_items = res.json().get("Items")
if res_items:
ret_movies = []
for item in res_items:
item_tmdbid = item.get("ProviderIds", {}).get("Tmdb")
mediaserver_item = schemas.MediaServerItem(
server="jellyfin",
library=item.get("ParentId"),
item_id=item.get("Id"),
item_type=item.get("Type"),
title=item.get("Name"),
original_title=item.get("OriginalTitle"),
year=item.get("ProductionYear"),
tmdbid=int(item_tmdbid) if item_tmdbid else None,
imdbid=item.get("ProviderIds", {}).get("Imdb"),
tvdbid=item.get("ProviderIds", {}).get("Tvdb"),
path=item.get("Path")
)
if tmdb_id and item_tmdbid:
if str(item_tmdbid) != str(tmdb_id):
continue
else:
with RequestUtils().get_res(req_url) as res:
if res:
res_items = res.json().get("Items")
if res_items:
ret_movies = []
for item in res_items:
item_tmdbid = item.get("ProviderIds", {}).get("Tmdb")
mediaserver_item = schemas.MediaServerItem(
server="jellyfin",
library=item.get("ParentId"),
item_id=item.get("Id"),
item_type=item.get("Type"),
title=item.get("Name"),
original_title=item.get("OriginalTitle"),
year=item.get("ProductionYear"),
tmdbid=int(item_tmdbid) if item_tmdbid else None,
imdbid=item.get("ProviderIds", {}).get("Imdb"),
tvdbid=item.get("ProviderIds", {}).get("Tvdb"),
path=item.get("Path")
)
if tmdb_id and item_tmdbid:
if str(item_tmdbid) != str(tmdb_id):
continue
else:
ret_movies.append(mediaserver_item)
continue
if mediaserver_item.title == title and (
not year or str(mediaserver_item.year) == str(year)):
ret_movies.append(mediaserver_item)
continue
if mediaserver_item.title == title and (
not year or str(mediaserver_item.year) == str(year)):
ret_movies.append(mediaserver_item)
return ret_movies
return ret_movies
except Exception as e:
logger.error(f"连接Items出错" + str(e))
return None
@ -387,25 +387,25 @@ class Jellyfin:
try:
req_url = "%sShows/%s/Episodes?season=%s&&userId=%s&isMissing=false&api_key=%s" % (
self._host, item_id, season, self.user, self._apikey)
res_json = RequestUtils().get_res(req_url)
if res_json:
tv_info = res_json.json()
res_items = tv_info.get("Items")
# 返回的季集信息
season_episodes = {}
for res_item in res_items:
season_index = res_item.get("ParentIndexNumber")
if not season_index:
continue
if season and season != season_index:
continue
episode_index = res_item.get("IndexNumber")
if not episode_index:
continue
if not season_episodes.get(season_index):
season_episodes[season_index] = []
season_episodes[season_index].append(episode_index)
return item_id, season_episodes
with RequestUtils().get_res(req_url) as res_json:
if res_json:
tv_info = res_json.json()
res_items = tv_info.get("Items")
# 返回的季集信息
season_episodes = {}
for res_item in res_items:
season_index = res_item.get("ParentIndexNumber")
if not season_index:
continue
if season and season != season_index:
continue
episode_index = res_item.get("IndexNumber")
if not episode_index:
continue
if not season_episodes.get(season_index):
season_episodes[season_index] = []
season_episodes[season_index].append(episode_index)
return item_id, season_episodes
except Exception as e:
logger.error(f"连接Shows/Id/Episodes出错" + str(e))
return None, None
@ -458,13 +458,13 @@ class Jellyfin:
_host = self._playhost
req_url = "%sItems/%s/Images/%s" % (_host, item_id, image_type)
try:
res = RequestUtils().get_res(req_url)
if res and res.status_code != 404:
logger.info("影片图片链接:{}".format(res.url))
return res.url
else:
logger.error("Items/Id/Images 未获取到返回数据或无该影片{}图片".format(image_type))
return None
with RequestUtils().get_res(req_url) as res:
if res and res.status_code != 404:
logger.info("影片图片链接:{}".format(res.url))
return res.url
else:
logger.error("Items/Id/Images 未获取到返回数据或无该影片{}图片".format(image_type))
return None
except Exception as e:
logger.error(f"连接Items/Id/Images出错" + str(e))
return None
@ -479,12 +479,12 @@ class Jellyfin:
"""
req_url = "%sItems/%s/Ancestors?api_key=%s" % (self._host, item_id, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res:
return res.json()[index].get(key)
else:
logger.error(f"Items/Id/Ancestors 未获取到返回数据")
return None
with RequestUtils().get_res(req_url) as res:
if res:
return res.json()[index].get(key)
else:
logger.error(f"Items/Id/Ancestors 未获取到返回数据")
return None
except Exception as e:
logger.error(f"连接Items/Id/Ancestors出错" + str(e))
return None
@ -497,11 +497,11 @@ class Jellyfin:
return False
req_url = "%sLibrary/Refresh?api_key=%s" % (self._host, self._apikey)
try:
res = RequestUtils().post_res(req_url)
if res:
return True
else:
logger.info(f"刷新媒体库失败无法连接Jellyfin")
with RequestUtils().post_res(req_url) as res:
if res:
return True
else:
logger.info(f"刷新媒体库失败无法连接Jellyfin")
except Exception as e:
logger.error(f"连接Library/Refresh出错" + str(e))
return False
@ -632,23 +632,23 @@ class Jellyfin:
req_url = "%sUsers/%s/Items/%s?api_key=%s" % (
self._host, self.user, itemid, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res and res.status_code == 200:
item = res.json()
tmdbid = item.get("ProviderIds", {}).get("Tmdb")
return schemas.MediaServerItem(
server="jellyfin",
library=item.get("ParentId"),
item_id=item.get("Id"),
item_type=item.get("Type"),
title=item.get("Name"),
original_title=item.get("OriginalTitle"),
year=item.get("ProductionYear"),
tmdbid=int(tmdbid) if tmdbid else None,
imdbid=item.get("ProviderIds", {}).get("Imdb"),
tvdbid=item.get("ProviderIds", {}).get("Tvdb"),
path=item.get("Path")
)
with RequestUtils().get_res(req_url) as res:
if res and res.status_code == 200:
item = res.json()
tmdbid = item.get("ProviderIds", {}).get("Tmdb")
return schemas.MediaServerItem(
server="jellyfin",
library=item.get("ParentId"),
item_id=item.get("Id"),
item_type=item.get("Type"),
title=item.get("Name"),
original_title=item.get("OriginalTitle"),
year=item.get("ProductionYear"),
tmdbid=int(tmdbid) if tmdbid else None,
imdbid=item.get("ProviderIds", {}).get("Imdb"),
tvdbid=item.get("ProviderIds", {}).get("Tvdb"),
path=item.get("Path")
)
except Exception as e:
logger.error(f"连接Users/Items出错" + str(e))
return None
@ -663,17 +663,17 @@ class Jellyfin:
yield None
req_url = "%sUsers/%s/Items?parentId=%s&api_key=%s" % (self._host, self.user, parent, self._apikey)
try:
res = RequestUtils().get_res(req_url)
if res and res.status_code == 200:
results = res.json().get("Items") or []
for result in results:
if not result:
continue
if result.get("Type") in ["Movie", "Series"]:
yield self.get_iteminfo(result.get("Id"))
elif "Folder" in result.get("Type"):
for item in self.get_items(result.get("Id")):
yield item
with RequestUtils().get_res(req_url) as res:
if res and res.status_code == 200:
results = res.json().get("Items") or []
for result in results:
if not result:
continue
if result.get("Type") in ["Movie", "Series"]:
yield self.get_iteminfo(result.get("Id"))
elif "Folder" in result.get("Type"):
for item in self.get_items(result.get("Id")):
yield item
except Exception as e:
logger.error(f"连接Users/Items出错" + str(e))
yield None
@ -761,50 +761,50 @@ class Jellyfin:
req_url = (f"{self._host}Users/{user}/Items/Resume?"
f"Limit=100&MediaTypes=Video&api_key={self._apikey}&Fields=ProductionYear,Path")
try:
res = RequestUtils().get_res(req_url)
if res:
result = res.json().get("Items") or []
ret_resume = []
# 用户媒体库文件夹列表(排除黑名单)
library_folders = self.get_user_library_folders()
for item in result:
if len(ret_resume) == num:
break
if item.get("Type") not in ["Movie", "Episode"]:
continue
item_path = item.get("Path")
if item_path and library_folders and not any(
str(item_path).startswith(folder) for folder in library_folders):
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.get_play_url(item.get("Id"))
if item.get("BackdropImageTags"):
image = self.__get_backdrop_url(item_id=item.get("Id"),
image_tag=item.get("BackdropImageTags")[0])
else:
image = self.__get_local_image_by_id(item.get("Id"))
# 小部分剧集无[xxx-S01E01-thumb.jpg]图片
image_res = RequestUtils().get_res(image)
if not image_res or image_res.status_code == 404:
image = self.generate_image_link(item.get("Id"), "Backdrop", False)
if item_type == MediaType.MOVIE.value:
title = item.get("Name")
subtitle = item.get("ProductionYear")
else:
title = f'{item.get("SeriesName")}'
subtitle = f'S{item.get("ParentIndexNumber")}:{item.get("IndexNumber")} - {item.get("Name")}'
ret_resume.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
title=title,
subtitle=subtitle,
type=item_type,
image=image,
link=link,
percent=item.get("UserData", {}).get("PlayedPercentage")
))
return ret_resume
else:
logger.error(f"Users/Items/Resume 未获取到返回数据")
with RequestUtils().get_res(req_url) as res:
if res:
result = res.json().get("Items") or []
ret_resume = []
# 用户媒体库文件夹列表(排除黑名单)
library_folders = self.get_user_library_folders()
for item in result:
if len(ret_resume) == num:
break
if item.get("Type") not in ["Movie", "Episode"]:
continue
item_path = item.get("Path")
if item_path and library_folders and not any(
str(item_path).startswith(folder) for folder in library_folders):
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.get_play_url(item.get("Id"))
if item.get("BackdropImageTags"):
image = self.__get_backdrop_url(item_id=item.get("Id"),
image_tag=item.get("BackdropImageTags")[0])
else:
image = self.__get_local_image_by_id(item.get("Id"))
# 小部分剧集无[xxx-S01E01-thumb.jpg]图片
with RequestUtils().get_res(image) as image_res:
if not image_res or image_res.status_code == 404:
image = self.generate_image_link(item.get("Id"), "Backdrop", False)
if item_type == MediaType.MOVIE.value:
title = item.get("Name")
subtitle = item.get("ProductionYear")
else:
title = f'{item.get("SeriesName")}'
subtitle = f'S{item.get("ParentIndexNumber")}:{item.get("IndexNumber")} - {item.get("Name")}'
ret_resume.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
title=title,
subtitle=subtitle,
type=item_type,
image=image,
link=link,
percent=item.get("UserData", {}).get("PlayedPercentage")
))
return ret_resume
else:
logger.error(f"Users/Items/Resume 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/Items/Resume出错" + str(e))
return []
@ -822,35 +822,35 @@ class Jellyfin:
req_url = (f"{self._host}Users/{user}/Items/Latest?"
f"Limit=100&MediaTypes=Video&api_key={self._apikey}&Fields=ProductionYear,Path")
try:
res = RequestUtils().get_res(req_url)
if res:
result = res.json() or []
ret_latest = []
# 用户媒体库文件夹列表(排除黑名单)
library_folders = self.get_user_library_folders()
for item in result:
if len(ret_latest) == num:
break
if item.get("Type") not in ["Movie", "Series"]:
continue
item_path = item.get("Path")
if item_path and library_folders and not any(
str(item_path).startswith(folder) for folder in library_folders):
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.get_play_url(item.get("Id"))
image = self.__get_local_image_by_id(item_id=item.get("Id"))
ret_latest.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
title=item.get("Name"),
subtitle=item.get("ProductionYear"),
type=item_type,
image=image,
link=link
))
return ret_latest
else:
logger.error(f"Users/Items/Latest 未获取到返回数据")
with RequestUtils().get_res(req_url) as res:
if res:
result = res.json() or []
ret_latest = []
# 用户媒体库文件夹列表(排除黑名单)
library_folders = self.get_user_library_folders()
for item in result:
if len(ret_latest) == num:
break
if item.get("Type") not in ["Movie", "Series"]:
continue
item_path = item.get("Path")
if item_path and library_folders and not any(
str(item_path).startswith(folder) for folder in library_folders):
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.get_play_url(item.get("Id"))
image = self.__get_local_image_by_id(item_id=item.get("Id"))
ret_latest.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
title=item.get("Name"),
subtitle=item.get("ProductionYear"),
type=item_type,
image=image,
link=link
))
return ret_latest
else:
logger.error(f"Users/Items/Latest 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/Items/Latest出错" + str(e))
return []

View File

@ -41,32 +41,32 @@ class Slack:
# 注册消息响应
@slack_app.event("message")
def slack_message(message):
local_res = requests.post(self._ds_url, json=message, timeout=10)
logger.debug("message: %s processed, response is: %s" % (message, local_res.text))
with requests.post(self._ds_url, json=message, timeout=10) as local_res:
logger.debug("message: %s processed, response is: %s" % (message, local_res.text))
@slack_app.action(re.compile(r"actionId-\d+"))
def slack_action(ack, body):
ack()
local_res = requests.post(self._ds_url, json=body, timeout=60)
logger.debug("message: %s processed, response is: %s" % (body, local_res.text))
with requests.post(self._ds_url, json=body, timeout=60) as local_res:
logger.debug("message: %s processed, response is: %s" % (body, local_res.text))
@slack_app.event("app_mention")
def slack_mention(say, body):
say(f"收到,请稍等... <@{body.get('event', {}).get('user')}>")
local_res = requests.post(self._ds_url, json=body, timeout=10)
logger.debug("message: %s processed, response is: %s" % (body, local_res.text))
with requests.post(self._ds_url, json=body, timeout=10) as local_res:
logger.debug("message: %s processed, response is: %s" % (body, local_res.text))
@slack_app.shortcut(re.compile(r"/*"))
def slack_shortcut(ack, body):
ack()
local_res = requests.post(self._ds_url, json=body, timeout=10)
logger.debug("message: %s processed, response is: %s" % (body, local_res.text))
with requests.post(self._ds_url, json=body, timeout=10) as local_res:
logger.debug("message: %s processed, response is: %s" % (body, local_res.text))
@slack_app.command(re.compile(r"/*"))
def slack_command(ack, body):
ack()
local_res = requests.post(self._ds_url, json=body, timeout=10)
logger.debug("message: %s processed, response is: %s" % (body, local_res.text))
with requests.post(self._ds_url, json=body, timeout=10) as local_res:
logger.debug("message: %s processed, response is: %s" % (body, local_res.text))
# 启动服务
try:

View File

@ -12,7 +12,7 @@ from app.modules import _ModuleBase
from app.modules.themoviedb.category import CategoryHelper
from app.modules.themoviedb.scraper import TmdbScraper
from app.modules.themoviedb.tmdb_cache import TmdbCache
from app.modules.themoviedb.tmdbapi import TmdbHelper
from app.modules.themoviedb.tmdbapi import TmdbApi
from app.schemas.types import MediaType, MediaImageType
from app.utils.http import RequestUtils
from app.utils.system import SystemUtils
@ -26,7 +26,7 @@ class TheMovieDbModule(_ModuleBase):
# 元数据缓存
cache: TmdbCache = None
# TMDB
tmdb: TmdbHelper = None
tmdb: TmdbApi = None
# 二级分类
category: CategoryHelper = None
# 刮削器
@ -34,12 +34,13 @@ class TheMovieDbModule(_ModuleBase):
def init_module(self) -> None:
self.cache = TmdbCache()
self.tmdb = TmdbHelper()
self.tmdb = TmdbApi()
self.category = CategoryHelper()
self.scraper = TmdbScraper(self.tmdb)
def stop(self):
self.cache.save()
self.tmdb.close()
def test(self) -> Tuple[bool, str]:
"""

View File

@ -15,7 +15,7 @@ from .tmdbv3api import TMDb, Search, Movie, TV, Season, Episode, Discover, Trend
from .tmdbv3api.exceptions import TMDbException
class TmdbHelper:
class TmdbApi:
"""
TMDB识别匹配
"""
@ -1271,3 +1271,9 @@ class TmdbHelper:
except Exception as e:
print(str(e))
return {}
def close(self):
"""
关闭连接
"""
self.tmdb.close()

View File

@ -222,6 +222,6 @@ class TMDb(object):
return json.get(key)
return json
def __del__(self):
def close(self):
if self._session:
self._session.close()

View File

@ -17,7 +17,7 @@ class TheTvDbModule(_ModuleBase):
proxies=settings.PROXY)
def stop(self):
pass
self.tvdb.close()
def test(self) -> Tuple[bool, str]:
"""

View File

@ -733,6 +733,10 @@ class Tvdb:
}
self.proxies = proxies
def close(self):
if self.session:
self.session.close()
@staticmethod
def _getTempDir():
"""Returns the [system temp dir]/tvdb_api-u501 (or
@ -764,17 +768,17 @@ class Tvdb:
if not self.__authorized:
# only authorize of we haven't before and we
# don't have the url in the cache
fake_session_for_key = requests.Session()
fake_session_for_key.headers['Accept-Language'] = language
cache_key = None
try:
# in case the session class has no cache object, fail gracefully
cache_key = self.session.cache.create_key(
fake_session_for_key.prepare_request(requests.Request('GET', url))
)
except Exception:
# FIXME: Can this just check for hasattr(self.session, "cache") instead?
pass
with requests.Session() as fake_session_for_key:
fake_session_for_key.headers['Accept-Language'] = language
try:
# in case the session class has no cache object, fail gracefully
cache_key = self.session.cache.create_key(
fake_session_for_key.prepare_request(requests.Request('GET', url))
)
except Exception:
# FIXME: Can this just check for hasattr(self.session, "cache") instead?
pass
# fmt: off
# No fmt because mangles noqa comment - https://github.com/psf/black/issues/195

View File

@ -71,18 +71,18 @@ class WeChat:
return None
try:
token_url = self._token_url % (self._corpid, self._appsecret)
res = RequestUtils().get_res(token_url)
if res:
ret_json = res.json()
if ret_json.get('errcode') == 0:
self._access_token = ret_json.get('access_token')
self._expires_in = ret_json.get('expires_in')
self._access_token_time = datetime.now()
elif res is not None:
logger.error(f"获取微信access_token失败错误码{res.status_code},错误原因:{res.reason}")
else:
logger.error(f"获取微信access_token失败未获取到返回信息")
raise Exception("获取微信access_token失败网络连接失败")
with RequestUtils().get_res(token_url) as res:
if res:
ret_json = res.json()
if ret_json.get('errcode') == 0:
self._access_token = ret_json.get('access_token')
self._expires_in = ret_json.get('expires_in')
self._access_token_time = datetime.now()
elif res is not None:
logger.error(f"获取微信access_token失败错误码{res.status_code},错误原因:{res.reason}")
else:
logger.error(f"获取微信access_token失败未获取到返回信息")
raise Exception("获取微信access_token失败网络连接失败")
except Exception as e:
logger.error(f"获取微信access_token失败错误信息{str(e)}")
return None

View File

@ -40,9 +40,9 @@ class WebUtils:
}
"""
try:
r = RequestUtils().get_res(f"https://api.mir6.com/api/ip?ip={ip}&type=json")
if r:
return r.json().get("data", {}).get("location") or ''
with RequestUtils().get_res(f"https://api.mir6.com/api/ip?ip={ip}&type=json") as r:
if r:
return r.json().get("data", {}).get("location") or ''
except Exception as err:
print(str(err))
return ""
@ -65,9 +65,9 @@ class WebUtils:
}
"""
try:
r = RequestUtils().get_res(f"https://whois.pconline.com.cn/ipJson.jsp?json=true&ip={ip}")
if r:
return r.json().get("addr") or ''
with RequestUtils().get_res(f"https://whois.pconline.com.cn/ipJson.jsp?json=true&ip={ip}") as r:
if r:
return r.json().get("addr") or ''
except Exception as err:
print(str(err))
return ""