import json from functools import lru_cache from pathlib import Path from typing import List, Optional, Dict, Tuple, Generator, Any from urllib.parse import quote_plus from plexapi import media from plexapi.server import PlexServer from app import schemas from app.core.config import settings from app.log import logger from app.schemas import MediaType class Plex: _plex = None def __init__(self): self._host = settings.PLEX_HOST if self._host: if not self._host.endswith("/"): self._host += "/" if not self._host.startswith("http"): self._host = "http://" + self._host self._playhost = settings.PLEX_PLAY_HOST if self._playhost: if not self._playhost.endswith("/"): self._playhost += "/" if not self._playhost.startswith("http"): self._playhost = "http://" + self._playhost self._token = settings.PLEX_TOKEN if self._host and self._token: try: self._plex = PlexServer(self._host, self._token) self._libraries = self._plex.library.sections() except Exception as e: self._plex = None logger.error(f"Plex服务器连接失败:{str(e)}") def is_inactive(self) -> bool: """ 判断是否需要重连 """ if not self._host or not self._token: return False return True if not self._plex else False def reconnect(self): """ 重连 """ try: self._plex = PlexServer(self._host, self._token) self._libraries = self._plex.library.sections() except Exception as e: self._plex = None logger.error(f"Plex服务器连接失败:{str(e)}") @lru_cache(maxsize=10) def __get_library_images(self, library_key: str, mtype: int) -> Optional[List[str]]: """ 获取媒体服务器最近添加的媒体的图片列表 param: library_key param: type type的含义: 1 电影 2 剧集 详见 plexapi/utils.py中SEARCHTYPES的定义 """ if not self._plex: return None # 返回结果 poster_urls = {} # 页码计数 container_start = 0 # 需要的总条数/每页的条数 total_size = 4 # 如果总数不足,接续获取下一页 while len(poster_urls) < total_size: items = self._plex.fetchItems(f"/hubs/home/recentlyAdded?type={mtype}§ionID={library_key}", container_size=total_size, container_start=container_start) for item in items: if item.type == 'episode': # 如果是剧集的单集,则去找上级的图片 if item.parentThumb is not None: poster_urls[item.parentThumb] = None else: # 否则就用自己的图片 if item.thumb is not None: poster_urls[item.thumb] = None if len(poster_urls) == total_size: break if len(items) < total_size: break container_start += total_size return [f"{self._host.rstrip('/') + url}?X-Plex-Token={self._token}" for url in list(poster_urls.keys())[:total_size]] def get_librarys(self) -> List[schemas.MediaServerLibrary]: """ 获取媒体服务器所有媒体库列表 """ if not self._plex: return [] try: self._libraries = self._plex.library.sections() except Exception as err: logger.error(f"获取媒体服务器所有媒体库列表出错:{str(err)}") return [] libraries = [] black_list = (settings.MEDIASERVER_SYNC_BLACKLIST or '').split(",") for library in self._libraries: if library.title in black_list: continue match library.type: case "movie": library_type = MediaType.MOVIE.value image_list = self.__get_library_images(library.key, 1) case "show": library_type = MediaType.TV.value image_list = self.__get_library_images(library.key, 2) case _: continue libraries.append( schemas.MediaServerLibrary( id=library.key, name=library.title, path=library.locations, type=library_type, image_list=image_list, link=f"{self._playhost or self._host}web/index.html#!/media/{self._plex.machineIdentifier}" f"/com.plexapp.plugins.library?source={library.key}" ) ) return libraries def get_medias_count(self) -> schemas.Statistic: """ 获得电影、电视剧、动漫媒体数量 :return: MovieCount SeriesCount SongCount """ if not self._plex: return schemas.Statistic() sections = self._plex.library.sections() MovieCount = SeriesCount = EpisodeCount = 0 # 媒体库白名单 allow_library = [lib.id for lib in self.get_librarys()] for sec in sections: if str(sec.key) not in allow_library: continue if sec.type == "movie": MovieCount += sec.totalSize if sec.type == "show": SeriesCount += sec.totalSize EpisodeCount += sec.totalViewSize(libtype='episode') return schemas.Statistic( movie_count=MovieCount, tv_count=SeriesCount, episode_count=EpisodeCount ) def get_movies(self, title: str, original_title: str = None, year: str = None, tmdb_id: int = None) -> Optional[List[schemas.MediaServerItem]]: """ 根据标题和年份,检查电影是否在Plex中存在,存在则返回列表 :param title: 标题 :param original_title: 原产地标题 :param year: 年份,为空则不过滤 :param tmdb_id: TMDB ID :return: 含title、year属性的字典列表 """ if not self._plex: return None ret_movies = [] if year: movies = self._plex.library.search(title=title, year=year, libtype="movie") # 根据原标题再查一遍 if original_title and str(original_title) != str(title): movies.extend(self._plex.library.search(title=original_title, year=year, libtype="movie")) else: movies = self._plex.library.search(title=title, libtype="movie") if original_title and str(original_title) != str(title): movies.extend(self._plex.library.search(title=original_title, libtype="movie")) for item in set(movies): ids = self.__get_ids(item.guids) if tmdb_id and ids['tmdb_id']: if str(ids['tmdb_id']) != str(tmdb_id): continue path = None if item.locations: path = item.locations[0] ret_movies.append( schemas.MediaServerItem( server="plex", library=item.librarySectionID, item_id=item.key, item_type=item.type, title=item.title, original_title=item.originalTitle, year=item.year, tmdbid=ids['tmdb_id'], imdbid=ids['imdb_id'], tvdbid=ids['tvdb_id'], path=path, ) ) return ret_movies def get_tv_episodes(self, item_id: str = None, title: str = None, original_title: str = None, year: str = None, tmdb_id: int = None, season: int = None) -> Tuple[Optional[str], Optional[Dict[int, list]]]: """ 根据标题、年份、季查询电视剧所有集信息 :param item_id: 媒体ID :param title: 标题 :param original_title: 原产地标题 :param year: 年份,可以为空,为空时不按年份过滤 :param tmdb_id: TMDB ID :param season: 季号,数字 :return: 所有集的列表 """ if not self._plex: return None, {} if item_id: videos = self._plex.fetchItem(item_id) else: # 兼容年份为空的场景 kwargs = {"year": year} if year else {} # 根据标题和年份模糊搜索,该结果不够准确 videos = self._plex.library.search(title=title, libtype="show", **kwargs) if (not videos and original_title and str(original_title) != str(title)): videos = self._plex.library.search(title=original_title, libtype="show", **kwargs) if not videos: return None, {} if isinstance(videos, list): videos = videos[0] video_tmdbid = self.__get_ids(videos.guids).get('tmdb_id') if tmdb_id and video_tmdbid: if str(video_tmdbid) != str(tmdb_id): return None, {} episodes = videos.episodes() season_episodes = {} for episode in episodes: if season and episode.seasonNumber != int(season): continue if episode.seasonNumber not in season_episodes: season_episodes[episode.seasonNumber] = [] season_episodes[episode.seasonNumber].append(episode.index) return videos.key, season_episodes def get_remote_image_by_id(self, item_id: str, image_type: str) -> Optional[str]: """ 根据ItemId从Plex查询图片地址 :param item_id: 在Emby中的ID :param image_type: 图片的类型,Poster或者Backdrop等 :return: 图片对应在TMDB中的URL """ if not self._plex: return None try: if image_type == "Poster": images = self._plex.fetchItems('/library/metadata/%s/posters' % item_id, cls=media.Poster) else: images = self._plex.fetchItems('/library/metadata/%s/arts' % item_id, cls=media.Art) for image in images: if hasattr(image, 'key') and image.key.startswith('http'): return image.key except Exception as e: logger.error(f"获取封面出错:" + str(e)) return None def refresh_root_library(self) -> bool: """ 通知Plex刷新整个媒体库 """ if not self._plex: return False return self._plex.library.update() def refresh_library_by_items(self, items: List[schemas.RefreshMediaItem]) -> bool: """ 按路径刷新媒体库 item: target_path """ if not self._plex: return False result_dict = {} for item in items: file_path = item.target_path lib_key, path = self.__find_librarie(file_path, self._libraries) # 如果存在同一剧集的多集,key(path)相同会合并 result_dict[path] = lib_key if "" in result_dict: # 如果有匹配失败的,刷新整个库 self._plex.library.update() else: # 否则一个一个刷新 for path, lib_key in result_dict.items(): logger.info(f"刷新媒体库:{lib_key} - {path}") self._plex.query(f'/library/sections/{lib_key}/refresh?path={quote_plus(str(Path(path).parent))}') @staticmethod def __find_librarie(path: Path, libraries: List[Any]) -> Tuple[str, str]: """ 判断这个path属于哪个媒体库 多个媒体库配置的目录不应有重复和嵌套, """ def is_subpath(_path: Path, _parent: Path) -> bool: """ 判断_path是否是_parent的子目录下 """ _path = _path.resolve() _parent = _parent.resolve() return _path.parts[:len(_parent.parts)] == _parent.parts if path is None: return "", "" try: for lib in libraries: if hasattr(lib, "locations") and lib.locations: for location in lib.locations: if is_subpath(path, Path(location)): return lib.key, str(path) except Exception as err: logger.error(f"查找媒体库出错:{str(err)}") return "", "" def get_iteminfo(self, itemid: str) -> Optional[schemas.MediaServerItem]: """ 获取单个项目详情 """ if not self._plex: return None try: item = self._plex.fetchItem(itemid) ids = self.__get_ids(item.guids) path = None if item.locations: path = item.locations[0] return schemas.MediaServerItem( server="plex", library=item.librarySectionID, item_id=item.key, item_type=item.type, title=item.title, original_title=item.originalTitle, year=item.year, tmdbid=ids['tmdb_id'], imdbid=ids['imdb_id'], tvdbid=ids['tvdb_id'], path=path, ) except Exception as err: logger.error(f"获取项目详情出错:{str(err)}") return None @staticmethod def __get_ids(guids: List[Any]) -> dict: def parse_tmdb_id(value: str) -> (bool, int): """尝试将TMDB ID字符串转换为整数。如果成功,返回(True, int),失败则返回(False, None)。""" try: int_value = int(value) return True, int_value except ValueError: return False, None guid_mapping = { "imdb://": "imdb_id", "tmdb://": "tmdb_id", "tvdb://": "tvdb_id" } ids = {varname: None for varname in guid_mapping.values()} for guid in guids: guid_id = guid['id'] if isinstance(guid, dict) else guid.id for prefix, varname in guid_mapping.items(): if guid_id.startswith(prefix): clean_id = guid_id[len(prefix):] if varname == "tmdb_id": # tmdb_id为int,Plex可能存在脏数据,特别处理tmdb_id success, parsed_id = parse_tmdb_id(clean_id) if success: ids[varname] = parsed_id else: ids[varname] = clean_id break return ids def get_items(self, parent: str) -> Generator: """ 获取媒体服务器所有媒体库列表 """ if not parent: yield None if not self._plex: yield None try: section = self._plex.library.sectionByID(int(parent)) if section: for item in section.all(): try: if not item: continue ids = self.__get_ids(item.guids) path = None if item.locations: path = item.locations[0] yield schemas.MediaServerItem( server="plex", library=item.librarySectionID, item_id=item.key, item_type=item.type, title=item.title, original_title=item.originalTitle, year=item.year, tmdbid=ids['tmdb_id'], imdbid=ids['imdb_id'], tvdbid=ids['tvdb_id'], path=path, ) except Exception as e: logger.error(f"处理媒体项目时出错:{str(e)}, 跳过此项目。") continue except Exception as err: logger.error(f"获取媒体库列表出错:{str(err)}") yield None def get_webhook_message(self, form: any) -> Optional[schemas.WebhookEventInfo]: """ 解析Plex报文 eventItem 字段的含义 event 事件类型 item_type 媒体类型 TV,MOV item_name TV:琅琊榜 S1E6 剖心明志 虎口脱险 MOV:猪猪侠大冒险(2001) overview 剧情描述 { "event": "media.scrobble", "user": false, "owner": true, "Account": { "id": 31646104, "thumb": "https://plex.tv/users/xx", "title": "播放" }, "Server": { "title": "Media-Server", "uuid": "xxxx" }, "Player": { "local": false, "publicAddress": "xx.xx.xx.xx", "title": "MagicBook", "uuid": "wu0uoa1ujfq90t0c5p9f7fw0" }, "Metadata": { "librarySectionType": "show", "ratingKey": "40294", "key": "/library/metadata/40294", "parentRatingKey": "40291", "grandparentRatingKey": "40275", "guid": "plex://episode/615580a9fa828e7f1a0caabd", "parentGuid": "plex://season/615580a9fa828e7f1a0caab8", "grandparentGuid": "plex://show/60e81fd8d8000e002d7d2976", "type": "episode", "title": "The World's Strongest Senior", "titleSort": "World's Strongest Senior", "grandparentKey": "/library/metadata/40275", "parentKey": "/library/metadata/40291", "librarySectionTitle": "动漫剧集", "librarySectionID": 7, "librarySectionKey": "/library/sections/7", "grandparentTitle": "范马刃牙", "parentTitle": "Combat Shadow Fighting Saga / Great Prison Battle Saga", "originalTitle": "Baki Hanma", "contentRating": "TV-MA", "summary": "The world is shaken by news", "index": 1, "parentIndex": 1, "audienceRating": 8.5, "viewCount": 1, "lastViewedAt": 1694320444, "year": 2021, "thumb": "/library/metadata/40294/thumb/1693544504", "art": "/library/metadata/40275/art/1693952979", "parentThumb": "/library/metadata/40291/thumb/1691115271", "grandparentThumb": "/library/metadata/40275/thumb/1693952979", "grandparentArt": "/library/metadata/40275/art/1693952979", "duration": 1500000, "originallyAvailableAt": "2021-09-30", "addedAt": 1691115281, "updatedAt": 1693544504, "audienceRatingImage": "themoviedb://image.rating", "Guid": [ { "id": "imdb://tt14765720" }, { "id": "tmdb://3087250" }, { "id": "tvdb://8530933" } ], "Rating": [ { "image": "themoviedb://image.rating", "value": 8.5, "type": "audience" } ], "Director": [ { "id": 115144, "filter": "director=115144", "tag": "Keiya Saito", "tagKey": "5f401c8d04a86500409ea6c1" } ], "Writer": [ { "id": 115135, "filter": "writer=115135", "tag": "Tatsuhiko Urahata", "tagKey": "5d7768e07a53e9001e6db1ce", "thumb": "https://metadata-static.plex.tv/f/people/f6f90dc89fa87d459f85d40a09720c05.jpg" } ] } } """ if not form: return None payload = form.get("payload") if not payload: return None try: message = json.loads(payload) except Exception as e: logger.debug(f"解析plex webhook出错:{str(e)}") return None eventType = message.get('event') if not eventType: return None logger.debug(f"接收到plex webhook:{message}") eventItem = schemas.WebhookEventInfo(event=eventType, channel="plex") if message.get('Metadata'): if message.get('Metadata', {}).get('type') == 'episode': eventItem.item_type = "TV" eventItem.item_name = "%s %s%s %s" % ( message.get('Metadata', {}).get('grandparentTitle'), "S" + str(message.get('Metadata', {}).get('parentIndex')), "E" + str(message.get('Metadata', {}).get('index')), message.get('Metadata', {}).get('title')) eventItem.item_id = message.get('Metadata', {}).get('ratingKey') eventItem.season_id = message.get('Metadata', {}).get('parentIndex') eventItem.episode_id = message.get('Metadata', {}).get('index') if (message.get('Metadata', {}).get('summary') and len(message.get('Metadata', {}).get('summary')) > 100): eventItem.overview = str(message.get('Metadata', {}).get('summary'))[:100] + "..." else: eventItem.overview = message.get('Metadata', {}).get('summary') else: eventItem.item_type = "MOV" if message.get('Metadata', {}).get('type') == 'movie' else "SHOW" eventItem.item_name = "%s %s" % ( message.get('Metadata', {}).get('title'), "(" + str(message.get('Metadata', {}).get('year')) + ")") eventItem.item_id = message.get('Metadata', {}).get('ratingKey') if len(message.get('Metadata', {}).get('summary')) > 100: eventItem.overview = str(message.get('Metadata', {}).get('summary'))[:100] + "..." else: eventItem.overview = message.get('Metadata', {}).get('summary') if message.get('Player'): eventItem.ip = message.get('Player').get('publicAddress') eventItem.client = message.get('Player').get('title') # 这里给个空,防止拼消息的时候出现None eventItem.device_name = ' ' if message.get('Account'): eventItem.user_name = message.get("Account").get('title') # 获取消息图片 if eventItem.item_id: # 根据返回的item_id去调用媒体服务器获取 eventItem.image_url = self.get_remote_image_by_id(item_id=eventItem.item_id, image_type="Backdrop") return eventItem def get_plex(self): """ 获取plex对象,以便直接操作 """ return self._plex def get_play_url(self, item_id: str) -> str: """ 拼装媒体播放链接 :param item_id: 媒体的的ID """ return f'{self._playhost or self._host}web/index.html#!/server/{self._plex.machineIdentifier}/details?key={item_id}' def get_resume(self, num: int = 12) -> Optional[List[schemas.MediaServerPlayItem]]: """ 获取继续观看的媒体 """ if not self._plex: return [] # 媒体库白名单 allow_library = ",".join([lib.id for lib in self.get_librarys()]) params = {'contentDirectoryID': allow_library} items = self._plex.fetchItems("/hubs/continueWatching/items", container_start=0, container_size=num, params=params) ret_resume = [] for item in items: item_type = MediaType.MOVIE.value if item.TYPE == "movie" else MediaType.TV.value if item_type == MediaType.MOVIE.value: title = item.title subtitle = item.year else: title = item.grandparentTitle subtitle = f"S{item.parentIndex}:E{item.index} - {item.title}" link = self.get_play_url(item.key) image = item.artUrl ret_resume.append(schemas.MediaServerPlayItem( id=item.key, title=title, subtitle=subtitle, type=item_type, image=image, link=link, percent=item.viewOffset / item.duration * 100 if item.viewOffset and item.duration else 0 )) return ret_resume[:num] def get_latest(self, num: int = 20) -> Optional[List[schemas.MediaServerPlayItem]]: """ 获取最近添加媒体 """ if not self._plex: return None # 请求参数(除黑名单) allow_library = ",".join([lib.id for lib in self.get_librarys()]) params = { "contentDirectoryID": allow_library, "count": num, "excludeContinueWatching": 1 } ret_resume = [] sub_result = [] offset = 0 while True: if len(ret_resume) >= num: break # 获取所有资料库 hubs = self._plex.fetchItems( '/hubs/promoted', container_start=offset, container_size=num, maxresults=num, params=params ) if len(hubs) == 0: break # 合并排序 for hub in hubs: for item in hub.items: sub_result.append(item) sub_result.sort(key=lambda x: x.addedAt, reverse=True) for item in sub_result: if len(ret_resume) >= num: break item_type, title, image = "", "", "" if item.TYPE == "movie": item_type = MediaType.MOVIE.value title = item.title image = item.posterUrl elif item.TYPE == "season": item_type = MediaType.TV.value title = "%s 第%s季" % (item.parentTitle, item.index) image = item.posterUrl elif item.TYPE == "episode": item_type = MediaType.TV.value title = "%s 第%s季 第%s集" % (item.grandparentTitle, item.parentIndex, item.index) thumb = (item.parentThumb or item.grandparentThumb or '').lstrip('/') image = (self._host + thumb + f"?X-Plex-Token={self._token}") elif item.TYPE == "show": item_type = MediaType.TV.value title = "%s 共%s季" % (item.title, item.seasonCount) image = item.posterUrl link = self.get_play_url(item.key) ret_resume.append(schemas.MediaServerPlayItem( id=item.key, title=title, subtitle=item.year, type=item_type, image=image, link=link )) offset += num return ret_resume[:num]