Fix bug in MediaSource definintion and enable strict type checking (#58321)

This commit is contained in:
Allen Porter 2021-10-24 02:39:39 -07:00 committed by GitHub
parent 0aa06d22f1
commit 31aa168bbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 55 additions and 38 deletions

View File

@ -72,6 +72,7 @@ homeassistant.components.mailbox.*
homeassistant.components.media_player.*
homeassistant.components.modbus.*
homeassistant.components.modem_callerid.*
homeassistant.components.media_source.*
homeassistant.components.mysensors.*
homeassistant.components.nam.*
homeassistant.components.nanoleaf.*

View File

@ -2,6 +2,7 @@
from __future__ import annotations
from datetime import timedelta
from typing import Any
from urllib.parse import quote
import voluptuous as vol
@ -10,6 +11,7 @@ from homeassistant.components import websocket_api
from homeassistant.components.http.auth import async_sign_path
from homeassistant.components.media_player.const import ATTR_MEDIA_CONTENT_ID
from homeassistant.components.media_player.errors import BrowseError
from homeassistant.components.websocket_api import ActiveConnection
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.integration_platform import (
async_process_integration_platforms,
@ -24,7 +26,7 @@ from .error import Unresolvable
DEFAULT_EXPIRY_TIME = 3600 * 24
def is_media_source_id(media_content_id: str):
def is_media_source_id(media_content_id: str) -> bool:
"""Test if identifier is a media source."""
return URI_SCHEME_REGEX.match(media_content_id) is not None
@ -52,7 +54,9 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return True
async def _process_media_source_platform(hass, domain, platform):
async def _process_media_source_platform(
hass: HomeAssistant, domain: str, platform: Any
) -> None:
"""Process a media source platform."""
hass.data[DOMAIN][domain] = await platform.async_get_media_source(hass)
@ -93,10 +97,12 @@ async def async_resolve_media(
}
)
@websocket_api.async_response
async def websocket_browse_media(hass, connection, msg):
async def websocket_browse_media(
hass: HomeAssistant, connection: ActiveConnection, msg: dict
) -> None:
"""Browse available media."""
try:
media = await async_browse_media(hass, msg.get("media_content_id"))
media = await async_browse_media(hass, msg.get("media_content_id", ""))
connection.send_result(
msg["id"],
media.as_dict(),
@ -113,7 +119,9 @@ async def websocket_browse_media(hass, connection, msg):
}
)
@websocket_api.async_response
async def websocket_resolve_media(hass, connection, msg):
async def websocket_resolve_media(
hass: HomeAssistant, connection: ActiveConnection, msg: dict
) -> None:
"""Resolve media."""
try:
media = await async_resolve_media(hass, msg["media_content_id"])

View File

@ -18,7 +18,7 @@ from .models import BrowseMediaSource, MediaSource, MediaSourceItem, PlayMedia
@callback
def async_setup(hass: HomeAssistant):
def async_setup(hass: HomeAssistant) -> None:
"""Set up local media source."""
source = LocalSource(hass)
hass.data[DOMAIN][DOMAIN] = source
@ -36,7 +36,7 @@ class LocalSource(MediaSource):
self.hass = hass
@callback
def async_full_path(self, source_dir_id, location) -> Path:
def async_full_path(self, source_dir_id: str, location: str) -> Path:
"""Return full path."""
return Path(self.hass.config.media_dirs[source_dir_id], location)
@ -58,7 +58,7 @@ class LocalSource(MediaSource):
return source_dir_id, location
async def async_resolve_media(self, item: MediaSourceItem) -> str:
async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
"""Resolve media to a url."""
source_dir_id, location = self.async_parse_identifier(item)
if source_dir_id == "" or source_dir_id not in self.hass.config.media_dirs:
@ -67,22 +67,22 @@ class LocalSource(MediaSource):
mime_type, _ = mimetypes.guess_type(
str(self.async_full_path(source_dir_id, location))
)
assert isinstance(mime_type, str)
return PlayMedia(f"/media/{item.identifier}", mime_type)
async def async_browse_media(
self, item: MediaSourceItem, media_types: tuple[str] = MEDIA_MIME_TYPES
) -> BrowseMediaSource:
async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
"""Return media."""
try:
source_dir_id, location = self.async_parse_identifier(item)
except Unresolvable as err:
raise BrowseError(str(err)) from err
return await self.hass.async_add_executor_job(
result = await self.hass.async_add_executor_job(
self._browse_media, source_dir_id, location
)
return result
def _browse_media(self, source_dir_id: str, location: Path):
def _browse_media(self, source_dir_id: str, location: str) -> BrowseMediaSource:
"""Browse media."""
# If only one media dir is configured, use that as the local media root
@ -122,9 +122,14 @@ class LocalSource(MediaSource):
if not full_path.is_dir():
raise BrowseError("Path is not a directory.")
return self._build_item_response(source_dir_id, full_path)
result = self._build_item_response(source_dir_id, full_path)
if not result:
raise BrowseError("Unknown source directory.")
return result
def _build_item_response(self, source_dir_id: str, path: Path, is_child=False):
def _build_item_response(
self, source_dir_id: str, path: Path, is_child: bool = False
) -> BrowseMediaSource | None:
mime_type, _ = mimetypes.guess_type(str(path))
is_file = path.is_file()
is_dir = path.is_dir()
@ -143,9 +148,11 @@ class LocalSource(MediaSource):
if is_dir:
title += "/"
media_class = MEDIA_CLASS_MAP.get(
mime_type and mime_type.split("/")[0], MEDIA_CLASS_DIRECTORY
)
media_class = MEDIA_CLASS_DIRECTORY
if mime_type:
media_class = MEDIA_CLASS_MAP.get(
mime_type.split("/")[0], MEDIA_CLASS_DIRECTORY
)
media = BrowseMediaSource(
domain=DOMAIN,

View File

@ -3,6 +3,7 @@ from __future__ import annotations
from abc import ABC
from dataclasses import dataclass
from typing import Any, cast
from homeassistant.components.media_player import BrowseMedia
from homeassistant.components.media_player.const import (
@ -27,9 +28,11 @@ class PlayMedia:
class BrowseMediaSource(BrowseMedia):
"""Represent a browsable media file."""
children: list[BrowseMediaSource] | None
children: list[BrowseMediaSource | BrowseMedia] | None
def __init__(self, *, domain: str | None, identifier: str | None, **kwargs) -> None:
def __init__(
self, *, domain: str | None, identifier: str | None, **kwargs: Any
) -> None:
"""Initialize media source browse media."""
media_content_id = f"{URI_SCHEME}{domain or ''}"
if identifier:
@ -85,7 +88,7 @@ class MediaSourceItem:
@callback
def async_media_source(self) -> MediaSource:
"""Return media source that owns this item."""
return self.hass.data[DOMAIN][self.domain]
return cast(MediaSource, self.hass.data[DOMAIN][self.domain])
@classmethod
def from_uri(cls, hass: HomeAssistant, uri: str) -> MediaSourceItem:
@ -104,7 +107,7 @@ class MediaSourceItem:
class MediaSource(ABC):
"""Represents a source of media files."""
name: str = None
name: str | None = None
def __init__(self, domain: str) -> None:
"""Initialize a media source."""
@ -116,8 +119,6 @@ class MediaSource(ABC):
"""Resolve a media item to a playable item."""
raise NotImplementedError
async def async_browse_media(
self, item: MediaSourceItem, media_types: tuple[str]
) -> BrowseMediaSource:
async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
"""Browse media."""
raise NotImplementedError

View File

@ -52,11 +52,7 @@ class NetatmoSource(MediaSource):
url = self.events[camera_id][event_id]["media_url"]
return PlayMedia(url, MIME_TYPE)
async def async_browse_media(
self,
item: MediaSourceItem,
media_types: tuple[str] = ("video",),
) -> BrowseMediaSource:
async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
"""Return media."""
try:
source, camera_id, event_id = async_parse_identifier(item)

View File

@ -17,7 +17,6 @@ from homeassistant.components.media_player.const import (
MEDIA_CLASS_IMAGE,
MEDIA_CLASS_VIDEO,
)
from homeassistant.components.media_source.const import MEDIA_MIME_TYPES
from homeassistant.components.media_source.models import (
BrowseMediaSource,
MediaSource,
@ -87,9 +86,7 @@ class XboxSource(MediaSource):
kind = category.split("#", 1)[1]
return PlayMedia(url, MIME_TYPE_MAP[kind])
async def async_browse_media(
self, item: MediaSourceItem, media_types: tuple[str] = MEDIA_MIME_TYPES
) -> BrowseMediaSource:
async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
"""Return media."""
title, category, _ = async_parse_identifier(item)

View File

@ -803,6 +803,17 @@ no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.media_source.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.mysensors.*]
check_untyped_defs = true
disallow_incomplete_defs = true
@ -1692,9 +1703,6 @@ ignore_errors = true
[mypy-homeassistant.components.lyric.*]
ignore_errors = true
[mypy-homeassistant.components.media_source.*]
ignore_errors = true
[mypy-homeassistant.components.melcloud.*]
ignore_errors = true

View File

@ -72,7 +72,6 @@ IGNORED_MODULES: Final[list[str]] = [
"homeassistant.components.luftdaten.*",
"homeassistant.components.lutron_caseta.*",
"homeassistant.components.lyric.*",
"homeassistant.components.media_source.*",
"homeassistant.components.melcloud.*",
"homeassistant.components.meteo_france.*",
"homeassistant.components.metoffice.*",