Bump ruff to 0.3.4 (#112690)

Co-authored-by: Sid <27780930+autinerd@users.noreply.github.com>
Co-authored-by: Marc Mueller <30130371+cdce8p@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Joost Lekkerkerker 2024-03-26 00:02:16 +01:00 committed by GitHub
parent 27219b6962
commit 6bb4e7d62c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1044 changed files with 24245 additions and 16750 deletions

View File

@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.2.1
rev: v0.3.4
hooks:
- id: ruff
args:

View File

@ -41,12 +41,10 @@ class cached_property(Generic[_T]):
)
@overload
def __get__(self, instance: None, owner: type[Any] | None = None) -> Self:
...
def __get__(self, instance: None, owner: type[Any] | None = None) -> Self: ...
@overload
def __get__(self, instance: Any, owner: type[Any] | None = None) -> _T:
...
def __get__(self, instance: Any, owner: type[Any] | None = None) -> _T: ...
def __get__(
self, instance: Any | None, owner: type[Any] | None = None

View File

@ -162,13 +162,13 @@ def _standardize_geography_config_entry(
# about, infer it from the data we have:
entry_updates["data"] = {**entry.data}
if CONF_CITY in entry.data:
entry_updates["data"][
CONF_INTEGRATION_TYPE
] = INTEGRATION_TYPE_GEOGRAPHY_NAME
entry_updates["data"][CONF_INTEGRATION_TYPE] = (
INTEGRATION_TYPE_GEOGRAPHY_NAME
)
else:
entry_updates["data"][
CONF_INTEGRATION_TYPE
] = INTEGRATION_TYPE_GEOGRAPHY_COORDS
entry_updates["data"][CONF_INTEGRATION_TYPE] = (
INTEGRATION_TYPE_GEOGRAPHY_COORDS
)
if not entry_updates:
return

View File

@ -211,9 +211,10 @@ class AmcrestChecker(ApiWrapper):
self, *args: Any, **kwargs: Any
) -> AsyncIterator[httpx.Response]:
"""amcrest.ApiWrapper.command wrapper to catch errors."""
async with self._async_command_wrapper(), super().async_stream_command(
*args, **kwargs
) as ret:
async with (
self._async_command_wrapper(),
super().async_stream_command(*args, **kwargs) as ret,
):
yield ret
@asynccontextmanager

View File

@ -108,21 +108,21 @@ class AmcrestSensor(SensorEntity):
elif sensor_type == SENSOR_SDCARD:
storage = await self._api.async_storage_all
try:
self._attr_extra_state_attributes[
"Total"
] = f"{storage['total'][0]:.2f} {storage['total'][1]}"
self._attr_extra_state_attributes["Total"] = (
f"{storage['total'][0]:.2f} {storage['total'][1]}"
)
except ValueError:
self._attr_extra_state_attributes[
"Total"
] = f"{storage['total'][0]} {storage['total'][1]}"
self._attr_extra_state_attributes["Total"] = (
f"{storage['total'][0]} {storage['total'][1]}"
)
try:
self._attr_extra_state_attributes[
"Used"
] = f"{storage['used'][0]:.2f} {storage['used'][1]}"
self._attr_extra_state_attributes["Used"] = (
f"{storage['used'][0]:.2f} {storage['used'][1]}"
)
except ValueError:
self._attr_extra_state_attributes[
"Used"
] = f"{storage['used'][0]} {storage['used'][1]}"
self._attr_extra_state_attributes["Used"] = (
f"{storage['used'][0]} {storage['used'][1]}"
)
try:
self._attr_native_value = f"{storage['used_percent']:.2f}"
except ValueError:

View File

@ -1,4 +1,5 @@
"""Diagnostics support for APCUPSD."""
from __future__ import annotations
from typing import Any

View File

@ -33,14 +33,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=_service_info_to_adv,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=_service_info_to_adv,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -754,9 +754,9 @@ class PipelineRun:
raise DuplicateWakeUpDetectedError(result.wake_word_phrase)
# Record last wake up time to block duplicate detections
self.hass.data[DATA_LAST_WAKE_UP][
result.wake_word_phrase
] = time.monotonic()
self.hass.data[DATA_LAST_WAKE_UP][result.wake_word_phrase] = (
time.monotonic()
)
if result.queued_audio:
# Add audio that was pending at detection.
@ -1375,9 +1375,9 @@ class PipelineInput:
raise DuplicateWakeUpDetectedError(self.wake_word_phrase)
# Record last wake up time to block duplicate detections
self.run.hass.data[DATA_LAST_WAKE_UP][
self.wake_word_phrase
] = time.monotonic()
self.run.hass.data[DATA_LAST_WAKE_UP][self.wake_word_phrase] = (
time.monotonic()
)
stt_input_stream = stt_processed_stream

View File

@ -101,9 +101,9 @@ class AsusWrtDevice(ScannerEntity):
self._device = self._router.devices[self._device.mac]
self._attr_extra_state_attributes = {}
if self._device.last_activity:
self._attr_extra_state_attributes[
ATTR_LAST_TIME_REACHABLE
] = self._device.last_activity.isoformat(timespec="seconds")
self._attr_extra_state_attributes[ATTR_LAST_TIME_REACHABLE] = (
self._device.last_activity.isoformat(timespec="seconds")
)
self.async_write_ha_state()
async def async_added_to_hass(self) -> None:

View File

@ -141,9 +141,9 @@ class AugustLock(AugustEntityMixin, RestoreEntity, LockEntity):
ATTR_BATTERY_LEVEL: self._detail.battery_level
}
if self._detail.keypad is not None:
self._attr_extra_state_attributes[
"keypad_battery_level"
] = self._detail.keypad.battery_level
self._attr_extra_state_attributes["keypad_battery_level"] = (
self._detail.keypad.battery_level
)
async def async_added_to_hass(self) -> None:
"""Restore ATTR_CHANGED_BY on startup since it is likely no longer in the activity log."""

View File

@ -92,9 +92,10 @@ async def fetch_redirect_uris(hass: HomeAssistant, url: str) -> list[str]:
parser = LinkTagParser("redirect_uri")
chunks = 0
try:
async with aiohttp.ClientSession() as session, session.get(
url, timeout=5
) as resp:
async with (
aiohttp.ClientSession() as session,
session.get(url, timeout=5) as resp,
):
async for data in resp.content.iter_chunked(1024):
parser.feed(data.decode())
chunks += 1

View File

@ -122,9 +122,9 @@ class AwairFlowHandler(ConfigFlow, domain=DOMAIN):
for flow in self._async_in_progress():
if flow["context"]["source"] == SOURCE_ZEROCONF:
info = flow["context"]["title_placeholders"]
entries[
flow["context"]["host"]
] = f"{info['model']} ({info['device_id']})"
entries[flow["context"]["host"]] = (
f"{info['model']} ({info['device_id']})"
)
return entries
async def async_step_local(

View File

@ -2,6 +2,7 @@
Central point to load entities for the different platforms.
"""
from __future__ import annotations
from functools import partial

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = BlueMaestroBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -129,9 +129,9 @@ class PassiveBluetoothDataUpdate(Generic[_T]):
"""Generic bluetooth data."""
devices: dict[str | None, DeviceInfo] = dataclasses.field(default_factory=dict)
entity_descriptions: dict[
PassiveBluetoothEntityKey, EntityDescription
] = dataclasses.field(default_factory=dict)
entity_descriptions: dict[PassiveBluetoothEntityKey, EntityDescription] = (
dataclasses.field(default_factory=dict)
)
entity_names: dict[PassiveBluetoothEntityKey, str | None] = dataclasses.field(
default_factory=dict
)

View File

@ -29,14 +29,14 @@ def async_load_history_from_system(
not (existing_all := connectable_loaded_history.get(address))
or history.advertisement_data.rssi > existing_all.rssi
):
connectable_loaded_history[address] = all_loaded_history[
address
] = BluetoothServiceInfoBleak.from_device_and_advertisement_data(
history.device,
history.advertisement_data,
history.source,
now_monotonic,
True,
connectable_loaded_history[address] = all_loaded_history[address] = (
BluetoothServiceInfoBleak.from_device_and_advertisement_data(
history.device,
history.advertisement_data,
history.source,
now_monotonic,
True,
)
)
# Restore remote adapters

View File

@ -102,8 +102,8 @@ class BMWLock(BMWBaseEntity, LockEntity):
LockState.LOCKED,
LockState.SECURED,
}
self._attr_extra_state_attributes[
"door_lock_state"
] = self.vehicle.doors_and_windows.door_lock_state.value
self._attr_extra_state_attributes["door_lock_state"] = (
self.vehicle.doors_and_windows.door_lock_state.value
)
super()._handle_coordinator_update()

View File

@ -76,9 +76,9 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
await hass.async_add_executor_job(session.stop_polling)
await hass.async_add_executor_job(session.start_polling)
hass.data[DOMAIN][entry.entry_id][
DATA_POLLING_HANDLER
] = hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_polling)
hass.data[DOMAIN][entry.entry_id][DATA_POLLING_HANDLER] = (
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_polling)
)
return True

View File

@ -63,9 +63,9 @@ class BrottsplatskartanSensor(SensorEntity):
"""Update device state."""
incident_counts: defaultdict[str, int] = defaultdict(int)
get_incidents: dict[str, list] | Literal[
False
] = self._brottsplatskartan.get_incidents()
get_incidents: dict[str, list] | Literal[False] = (
self._brottsplatskartan.get_incidents()
)
if get_incidents is False:
LOGGER.debug("Problems fetching incidents")

View File

@ -129,20 +129,22 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
data = BTHomeBluetoothDeviceData(**kwargs)
device_registry = async_get(hass)
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = BTHomePassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=lambda service_info: process_service_info(
hass, entry, data, service_info, device_registry
),
device_data=data,
discovered_event_classes=set(entry.data.get(CONF_DISCOVERED_EVENT_CLASSES, [])),
connectable=False,
entry=entry,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
BTHomePassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=lambda service_info: process_service_info(
hass, entry, data, service_info, device_registry
),
device_data=data,
discovered_event_classes=set(
entry.data.get(CONF_DISCOVERED_EVENT_CLASSES, [])
),
connectable=False,
entry=entry,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

View File

@ -46,9 +46,9 @@ class CanaryDataUpdateCoordinator(DataUpdateCoordinator[CanaryData]):
for device in location.devices:
if device.is_online:
readings_by_device_id[
device.device_id
] = self.canary.get_latest_readings(device.device_id)
readings_by_device_id[device.device_id] = (
self.canary.get_latest_readings(device.device_id)
)
return {
"locations": locations_by_id,

View File

@ -140,13 +140,13 @@ async def async_attach_trigger(
}
if trigger_type == "current_temperature_changed":
numeric_state_config[
numeric_state_trigger.CONF_VALUE_TEMPLATE
] = "{{ state.attributes.current_temperature }}"
numeric_state_config[numeric_state_trigger.CONF_VALUE_TEMPLATE] = (
"{{ state.attributes.current_temperature }}"
)
else: # trigger_type == "current_humidity_changed"
numeric_state_config[
numeric_state_trigger.CONF_VALUE_TEMPLATE
] = "{{ state.attributes.current_humidity }}"
numeric_state_config[numeric_state_trigger.CONF_VALUE_TEMPLATE] = (
"{{ state.attributes.current_humidity }}"
)
if CONF_ABOVE in config:
numeric_state_config[CONF_ABOVE] = config[CONF_ABOVE]

View File

@ -29,9 +29,9 @@ class CO2SensorEntityDescription(SensorEntityDescription):
# For backwards compat, allow description to override unique ID key to use
unique_id: str | None = None
unit_of_measurement_fn: Callable[
[CarbonIntensityResponse], str | None
] | None = None
unit_of_measurement_fn: Callable[[CarbonIntensityResponse], str | None] | None = (
None
)
value_fn: Callable[[CarbonIntensityResponse], float | None]

View File

@ -178,9 +178,9 @@ class DaikinClimate(ClimateEntity):
# temperature
elif attr == ATTR_TEMPERATURE:
try:
values[
HA_ATTR_TO_DAIKIN[ATTR_TARGET_TEMPERATURE]
] = format_target_temperature(value)
values[HA_ATTR_TO_DAIKIN[ATTR_TARGET_TEMPERATURE]] = (
format_target_temperature(value)
)
except ValueError:
_LOGGER.error("Invalid temperature %s", value)

View File

@ -6,7 +6,7 @@ from asyncio import Event, get_running_loop
import logging
from threading import Thread
import debugpy
import debugpy # noqa: T100
import voluptuous as vol
from homeassistant.const import CONF_HOST, CONF_PORT
@ -60,7 +60,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
ready = Event()
def waitfor():
debugpy.wait_for_client()
debugpy.wait_for_client() # noqa: T100
hass.loop.call_soon_threadsafe(ready.set)
Thread(target=waitfor).start()

View File

@ -61,7 +61,12 @@ async def async_setup_events(hub: DeconzHub) -> None:
@callback
def async_add_sensor(_: EventType, sensor_id: str) -> None:
"""Create DeconzEvent."""
new_event: DeconzAlarmEvent | DeconzEvent | DeconzPresenceEvent | DeconzRelativeRotaryEvent
new_event: (
DeconzAlarmEvent
| DeconzEvent
| DeconzPresenceEvent
| DeconzRelativeRotaryEvent
)
sensor = hub.api.sensors[sensor_id]
if isinstance(sensor, Switch):

View File

@ -134,8 +134,7 @@ async def async_get_device_automation_platform(
hass: HomeAssistant,
domain: str,
automation_type: Literal[DeviceAutomationType.TRIGGER],
) -> DeviceAutomationTriggerProtocol:
...
) -> DeviceAutomationTriggerProtocol: ...
@overload
@ -143,8 +142,7 @@ async def async_get_device_automation_platform(
hass: HomeAssistant,
domain: str,
automation_type: Literal[DeviceAutomationType.CONDITION],
) -> DeviceAutomationConditionProtocol:
...
) -> DeviceAutomationConditionProtocol: ...
@overload
@ -152,15 +150,13 @@ async def async_get_device_automation_platform(
hass: HomeAssistant,
domain: str,
automation_type: Literal[DeviceAutomationType.ACTION],
) -> DeviceAutomationActionProtocol:
...
) -> DeviceAutomationActionProtocol: ...
@overload
async def async_get_device_automation_platform(
hass: HomeAssistant, domain: str, automation_type: DeviceAutomationType
) -> DeviceAutomationPlatformType:
...
) -> DeviceAutomationPlatformType: ...
async def async_get_device_automation_platform(

View File

@ -28,9 +28,9 @@ async def async_setup_entry(
) -> None:
"""Get all devices and sensors and setup them via config entry."""
device: Device = hass.data[DOMAIN][entry.entry_id]["device"]
coordinators: dict[
str, DataUpdateCoordinator[list[ConnectedStationInfo]]
] = hass.data[DOMAIN][entry.entry_id]["coordinators"]
coordinators: dict[str, DataUpdateCoordinator[list[ConnectedStationInfo]]] = (
hass.data[DOMAIN][entry.entry_id]["coordinators"]
)
registry = er.async_get(hass)
tracked = set()

View File

@ -41,13 +41,17 @@ CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
class DiagnosticsPlatformData:
"""Diagnostic platform data."""
config_entry_diagnostics: Callable[
[HomeAssistant, ConfigEntry], Coroutine[Any, Any, Mapping[str, Any]]
] | None
device_diagnostics: Callable[
[HomeAssistant, ConfigEntry, DeviceEntry],
Coroutine[Any, Any, Mapping[str, Any]],
] | None
config_entry_diagnostics: (
Callable[[HomeAssistant, ConfigEntry], Coroutine[Any, Any, Mapping[str, Any]]]
| None
)
device_diagnostics: (
Callable[
[HomeAssistant, ConfigEntry, DeviceEntry],
Coroutine[Any, Any, Mapping[str, Any]],
]
| None
)
@dataclass(slots=True)

View File

@ -18,8 +18,7 @@ def async_redact_data(data: Mapping, to_redact: Iterable[Any]) -> dict: # type:
@overload
def async_redact_data(data: _T, to_redact: Iterable[Any]) -> _T:
...
def async_redact_data(data: _T, to_redact: Iterable[Any]) -> _T: ...
@callback

View File

@ -1,6 +1,5 @@
"""Event module."""
from deebot_client.capabilities import Capabilities, CapabilityEvent
from deebot_client.device import Device
from deebot_client.events import CleanJobStatus, ReportStatsEvent

View File

@ -238,9 +238,9 @@ class EnturPublicTransportSensor(SensorEntity):
self._attributes[ATTR_NEXT_UP_AT] = calls[1].expected_departure_time.strftime(
"%H:%M"
)
self._attributes[
ATTR_NEXT_UP_IN
] = f"{due_in_minutes(calls[1].expected_departure_time)} min"
self._attributes[ATTR_NEXT_UP_IN] = (
f"{due_in_minutes(calls[1].expected_departure_time)} min"
)
self._attributes[ATTR_NEXT_UP_REALTIME] = calls[1].is_realtime
self._attributes[ATTR_NEXT_UP_DELAY] = calls[1].delay_in_min

View File

@ -40,21 +40,21 @@ from .entity import (
)
from .enum_mapper import EsphomeEnumMapper
_ESPHOME_ACP_STATE_TO_HASS_STATE: EsphomeEnumMapper[
AlarmControlPanelState, str
] = EsphomeEnumMapper(
{
AlarmControlPanelState.DISARMED: STATE_ALARM_DISARMED,
AlarmControlPanelState.ARMED_HOME: STATE_ALARM_ARMED_HOME,
AlarmControlPanelState.ARMED_AWAY: STATE_ALARM_ARMED_AWAY,
AlarmControlPanelState.ARMED_NIGHT: STATE_ALARM_ARMED_NIGHT,
AlarmControlPanelState.ARMED_VACATION: STATE_ALARM_ARMED_VACATION,
AlarmControlPanelState.ARMED_CUSTOM_BYPASS: STATE_ALARM_ARMED_CUSTOM_BYPASS,
AlarmControlPanelState.PENDING: STATE_ALARM_PENDING,
AlarmControlPanelState.ARMING: STATE_ALARM_ARMING,
AlarmControlPanelState.DISARMING: STATE_ALARM_DISARMING,
AlarmControlPanelState.TRIGGERED: STATE_ALARM_TRIGGERED,
}
_ESPHOME_ACP_STATE_TO_HASS_STATE: EsphomeEnumMapper[AlarmControlPanelState, str] = (
EsphomeEnumMapper(
{
AlarmControlPanelState.DISARMED: STATE_ALARM_DISARMED,
AlarmControlPanelState.ARMED_HOME: STATE_ALARM_ARMED_HOME,
AlarmControlPanelState.ARMED_AWAY: STATE_ALARM_ARMED_AWAY,
AlarmControlPanelState.ARMED_NIGHT: STATE_ALARM_ARMED_NIGHT,
AlarmControlPanelState.ARMED_VACATION: STATE_ALARM_ARMED_VACATION,
AlarmControlPanelState.ARMED_CUSTOM_BYPASS: STATE_ALARM_ARMED_CUSTOM_BYPASS,
AlarmControlPanelState.PENDING: STATE_ALARM_PENDING,
AlarmControlPanelState.ARMING: STATE_ALARM_ARMING,
AlarmControlPanelState.DISARMING: STATE_ALARM_DISARMING,
AlarmControlPanelState.TRIGGERED: STATE_ALARM_TRIGGERED,
}
)
)

View File

@ -166,14 +166,14 @@ def convert_api_error_ha_error(
ICON_SCHEMA = vol.Schema(cv.icon)
ENTITY_CATEGORIES: EsphomeEnumMapper[
EsphomeEntityCategory, EntityCategory | None
] = EsphomeEnumMapper(
{
EsphomeEntityCategory.NONE: None,
EsphomeEntityCategory.CONFIG: EntityCategory.CONFIG,
EsphomeEntityCategory.DIAGNOSTIC: EntityCategory.DIAGNOSTIC,
}
ENTITY_CATEGORIES: EsphomeEnumMapper[EsphomeEntityCategory, EntityCategory | None] = (
EsphomeEnumMapper(
{
EsphomeEntityCategory.NONE: None,
EsphomeEntityCategory.CONFIG: EntityCategory.CONFIG,
EsphomeEntityCategory.DIAGNOSTIC: EntityCategory.DIAGNOSTIC,
}
)
)

View File

@ -21,12 +21,10 @@ class EsphomeEnumMapper(Generic[_EnumT, _ValT]):
self._inverse: dict[_ValT, _EnumT] = {v: k for k, v in mapping.items()}
@overload
def from_esphome(self, value: _EnumT) -> _ValT:
...
def from_esphome(self, value: _EnumT) -> _ValT: ...
@overload
def from_esphome(self, value: _EnumT | None) -> _ValT | None:
...
def from_esphome(self, value: _EnumT | None) -> _ValT | None: ...
def from_esphome(self, value: _EnumT | None) -> _ValT | None:
"""Convert from an esphome int representation to a hass string."""

View File

@ -52,15 +52,15 @@ async def async_setup_entry(
)
_STATE_CLASSES: EsphomeEnumMapper[
EsphomeSensorStateClass, SensorStateClass | None
] = EsphomeEnumMapper(
{
EsphomeSensorStateClass.NONE: None,
EsphomeSensorStateClass.MEASUREMENT: SensorStateClass.MEASUREMENT,
EsphomeSensorStateClass.TOTAL_INCREASING: SensorStateClass.TOTAL_INCREASING,
EsphomeSensorStateClass.TOTAL: SensorStateClass.TOTAL,
}
_STATE_CLASSES: EsphomeEnumMapper[EsphomeSensorStateClass, SensorStateClass | None] = (
EsphomeEnumMapper(
{
EsphomeSensorStateClass.NONE: None,
EsphomeSensorStateClass.MEASUREMENT: SensorStateClass.MEASUREMENT,
EsphomeSensorStateClass.TOTAL_INCREASING: SensorStateClass.TOTAL_INCREASING,
EsphomeSensorStateClass.TOTAL: SensorStateClass.TOTAL,
}
)
)

View File

@ -154,9 +154,9 @@ class FileUploadView(HomeAssistantView):
file_upload_data: FileUploadData = hass.data[DOMAIN]
file_dir = file_upload_data.file_dir(file_id)
queue: SimpleQueue[
tuple[bytes, asyncio.Future[None] | None] | None
] = SimpleQueue()
queue: SimpleQueue[tuple[bytes, asyncio.Future[None] | None] | None] = (
SimpleQueue()
)
def _sync_queue_consumer() -> None:
file_dir.mkdir()

View File

@ -96,9 +96,9 @@ def fill_in_schema_dict(some_input):
schema_dict = {}
for field, _type in DATA_SCHEMA_DICT.items():
if some_input.get(str(field)):
schema_dict[
vol.Optional(str(field), default=some_input[str(field)])
] = _type
schema_dict[vol.Optional(str(field), default=some_input[str(field)])] = (
_type
)
else:
schema_dict[field] = _type
return schema_dict

View File

@ -127,9 +127,9 @@ async def async_setup_entry(
forked_daapd_updater = ForkedDaapdUpdater(
hass, forked_daapd_api, config_entry.entry_id
)
hass.data[DOMAIN][config_entry.entry_id][
HASS_DATA_UPDATER_KEY
] = forked_daapd_updater
hass.data[DOMAIN][config_entry.entry_id][HASS_DATA_UPDATER_KEY] = (
forked_daapd_updater
)
await forked_daapd_updater.async_init()
@ -956,9 +956,9 @@ class ForkedDaapdUpdater:
if not {"outputs", "volume"}.isdisjoint(update_types): # update outputs
if outputs := await self._api.get_request("outputs"):
outputs = outputs["outputs"]
update_events[
"outputs"
] = asyncio.Event() # only for master, zones should ignore
update_events["outputs"] = (
asyncio.Event()
) # only for master, zones should ignore
async_dispatcher_send(
self.hass,
SIGNAL_UPDATE_OUTPUTS.format(self._entry_id),

View File

@ -78,9 +78,9 @@ class FroniusCoordinatorBase(
for solar_net_id in data:
if solar_net_id not in self.unregistered_descriptors:
# id seen for the first time
self.unregistered_descriptors[
solar_net_id
] = self.valid_descriptions.copy()
self.unregistered_descriptors[solar_net_id] = (
self.valid_descriptions.copy()
)
return data
@callback
@ -115,9 +115,9 @@ class FroniusCoordinatorBase(
solar_net_id=solar_net_id,
)
)
self.unregistered_descriptors[
solar_net_id
] = remaining_unregistered_descriptors
self.unregistered_descriptors[solar_net_id] = (
remaining_unregistered_descriptors
)
async_add_entities(new_entities)
_add_entities_for_unregistered_descriptors()

View File

@ -162,9 +162,9 @@ class GeoRssServiceSensor(SensorEntity):
# And now compute the attributes from the filtered events.
matrix = {}
for entry in feed_entries:
matrix[
entry.title
] = f"{entry.distance_to_home:.0f}{UnitOfLength.KILOMETERS}"
matrix[entry.title] = (
f"{entry.distance_to_home:.0f}{UnitOfLength.KILOMETERS}"
)
self._state_attributes = matrix
elif status == UPDATE_OK_NO_DATA:
_LOGGER.debug("Update successful, but no data received from %s", self._feed)

View File

@ -30,9 +30,9 @@ async def async_setup_entry(
async_add_entities([GeofencyEntity(device, gps, location_name, attributes)])
hass.data[GF_DOMAIN]["unsub_device_tracker"][
config_entry.entry_id
] = async_dispatcher_connect(hass, TRACKER_UPDATE, _receive_data)
hass.data[GF_DOMAIN]["unsub_device_tracker"][config_entry.entry_id] = (
async_dispatcher_connect(hass, TRACKER_UPDATE, _receive_data)
)
# Restore previously loaded devices
dev_reg = dr.async_get(hass)

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = GoveeBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.ACTIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.ACTIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -40,9 +40,9 @@ async def async_setup_entry(
async_add_entities([GPSLoggerEntity(device, gps, battery, accuracy, attrs)])
hass.data[GPL_DOMAIN]["unsub_device_tracker"][
entry.entry_id
] = async_dispatcher_connect(hass, TRACKER_UPDATE, _receive_data)
hass.data[GPL_DOMAIN]["unsub_device_tracker"][entry.entry_id] = (
async_dispatcher_connect(hass, TRACKER_UPDATE, _receive_data)
)
# Restore previously loaded devices
dev_reg = dr.async_get(hass)

View File

@ -737,10 +737,10 @@ class GTFSDepartureSensor(SensorEntity):
self._attributes[ATTR_LOCATION_DESTINATION] = LOCATION_TYPE_OPTIONS.get(
self._destination.location_type, LOCATION_TYPE_DEFAULT
)
self._attributes[
ATTR_WHEELCHAIR_DESTINATION
] = WHEELCHAIR_BOARDING_OPTIONS.get(
self._destination.wheelchair_boarding, WHEELCHAIR_BOARDING_DEFAULT
self._attributes[ATTR_WHEELCHAIR_DESTINATION] = (
WHEELCHAIR_BOARDING_OPTIONS.get(
self._destination.wheelchair_boarding, WHEELCHAIR_BOARDING_DEFAULT
)
)
# Manage Route metadata

View File

@ -139,16 +139,16 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
(API_VALVE_STATUS, client.valve.status),
(API_WIFI_STATUS, client.wifi.status),
):
coordinator = valve_controller_coordinators[
api
] = GuardianDataUpdateCoordinator(
hass,
entry=entry,
client=client,
api_name=api,
api_coro=api_coro,
api_lock=api_lock,
valve_controller_uid=entry.data[CONF_UID],
coordinator = valve_controller_coordinators[api] = (
GuardianDataUpdateCoordinator(
hass,
entry=entry,
client=client,
api_name=api,
api_coro=api_coro,
api_lock=api_lock,
valve_controller_uid=entry.data[CONF_UID],
)
)
init_valve_controller_tasks.append(async_init_coordinator(coordinator))

View File

@ -148,9 +148,9 @@ class HassIOView(HomeAssistantView):
return web.Response(status=HTTPStatus.UNAUTHORIZED)
if authorized:
headers[
AUTHORIZATION
] = f"Bearer {os.environ.get('SUPERVISOR_TOKEN', '')}"
headers[AUTHORIZATION] = (
f"Bearer {os.environ.get('SUPERVISOR_TOKEN', '')}"
)
if request.method == "POST":
headers[CONTENT_TYPE] = request.content_type

View File

@ -161,9 +161,9 @@ class HeosMediaPlayer(MediaPlayerEntity):
async_dispatcher_connect(self.hass, SIGNAL_HEOS_UPDATED, self._heos_updated)
)
# Register this player's entity_id so it can be resolved by the group manager
self.hass.data[HEOS_DOMAIN][DATA_ENTITY_ID_MAP][
self._player.player_id
] = self.entity_id
self.hass.data[HEOS_DOMAIN][DATA_ENTITY_ID_MAP][self._player.player_id] = (
self.entity_id
)
async_dispatcher_send(self.hass, SIGNAL_HEOS_PLAYER_ADDED)
@log_command_error("clear playlist")

View File

@ -29,15 +29,13 @@ def require_admin(
) -> Callable[
[_FuncType[_HomeAssistantViewT, _P, _ResponseT]],
_FuncType[_HomeAssistantViewT, _P, _ResponseT],
]:
...
]: ...
@overload
def require_admin(
_func: _FuncType[_HomeAssistantViewT, _P, _ResponseT],
) -> _FuncType[_HomeAssistantViewT, _P, _ResponseT]:
...
) -> _FuncType[_HomeAssistantViewT, _P, _ResponseT]: ...
def require_admin(

View File

@ -126,13 +126,13 @@ async def async_attach_trigger(
),
}
if trigger_type == "target_humidity_changed":
numeric_state_config[
numeric_state_trigger.CONF_VALUE_TEMPLATE
] = "{{ state.attributes.humidity }}"
numeric_state_config[numeric_state_trigger.CONF_VALUE_TEMPLATE] = (
"{{ state.attributes.humidity }}"
)
else: # trigger_type == "current_humidity_changed"
numeric_state_config[
numeric_state_trigger.CONF_VALUE_TEMPLATE
] = "{{ state.attributes.current_humidity }}"
numeric_state_config[numeric_state_trigger.CONF_VALUE_TEMPLATE] = (
"{{ state.attributes.current_humidity }}"
)
if CONF_ABOVE in config:
numeric_state_config[CONF_ABOVE] = config[CONF_ABOVE]

View File

@ -1,4 +1,5 @@
"""The Hunter Douglas PowerView integration."""
import logging
from aiopvapi.helpers.aiorequest import AioRequest

View File

@ -150,9 +150,9 @@ class IcloudAccount:
self._family_members_fullname = {}
if user_info.get("membersInfo") is not None:
for prs_id, member in user_info["membersInfo"].items():
self._family_members_fullname[
prs_id
] = f"{member['firstName']} {member['lastName']}"
self._family_members_fullname[prs_id] = (
f"{member['firstName']} {member['lastName']}"
)
self._devices = {}
self.update_devices()

View File

@ -63,8 +63,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
coordinator: ImapPushDataUpdateCoordinator | ImapPollingDataUpdateCoordinator = hass.data[
DOMAIN
].pop(entry.entry_id)
coordinator: (
ImapPushDataUpdateCoordinator | ImapPollingDataUpdateCoordinator
) = hass.data[DOMAIN].pop(entry.entry_id)
await coordinator.shutdown()
return unload_ok

View File

@ -513,9 +513,9 @@ class InfluxThread(threading.Thread):
def __init__(self, hass, influx, event_to_json, max_tries):
"""Initialize the listener."""
threading.Thread.__init__(self, name=DOMAIN)
self.queue: queue.SimpleQueue[
threading.Event | tuple[float, Event] | None
] = queue.SimpleQueue()
self.queue: queue.SimpleQueue[threading.Event | tuple[float, Event] | None] = (
queue.SimpleQueue()
)
self.influx = influx
self.event_to_json = event_to_json
self.max_tries = max_tries

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = INKBIRDBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.ACTIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.ACTIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -212,12 +212,12 @@ class ForecastSensor(IQVIAEntity, SensorEntity):
if not outlook_coordinator.last_update_success:
return
self._attr_extra_state_attributes[
ATTR_OUTLOOK
] = outlook_coordinator.data.get("Outlook")
self._attr_extra_state_attributes[
ATTR_SEASON
] = outlook_coordinator.data.get("Season")
self._attr_extra_state_attributes[ATTR_OUTLOOK] = (
outlook_coordinator.data.get("Outlook")
)
self._attr_extra_state_attributes[ATTR_SEASON] = (
outlook_coordinator.data.get("Season")
)
class IndexSensor(IQVIAEntity, SensorEntity):
@ -283,8 +283,8 @@ class IndexSensor(IQVIAEntity, SensorEntity):
)
elif self.entity_description.key == TYPE_DISEASE_TODAY:
for attrs in period["Triggers"]:
self._attr_extra_state_attributes[
f"{attrs['Name'].lower()}_index"
] = attrs["Index"]
self._attr_extra_state_attributes[f"{attrs['Name'].lower()}_index"] = (
attrs["Index"]
)
self._attr_native_value = period["Index"]

View File

@ -317,9 +317,9 @@ def _generate_device_info(node: Node) -> DeviceInfo:
and node.zwave_props
and node.zwave_props.mfr_id != "0"
):
device_info[
ATTR_MANUFACTURER
] = f"Z-Wave MfrID:{int(node.zwave_props.mfr_id):#0{6}x}"
device_info[ATTR_MANUFACTURER] = (
f"Z-Wave MfrID:{int(node.zwave_props.mfr_id):#0{6}x}"
)
model += (
f"Type:{int(node.zwave_props.prod_type_id):#0{6}x} "
f"Product:{int(node.zwave_props.product_id):#0{6}x}"

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = KegtronBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = LeaoneBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -63,10 +63,13 @@ class LidarrSensorEntityDescription(
"""Class to describe a Lidarr sensor."""
attributes_fn: Callable[[T], dict[str, str] | None] = lambda _: None
description_fn: Callable[
[LidarrSensorEntityDescription[T], LidarrRootFolder],
tuple[LidarrSensorEntityDescription[T], str] | None,
] | None = None
description_fn: (
Callable[
[LidarrSensorEntityDescription[T], LidarrRootFolder],
tuple[LidarrSensorEntityDescription[T], str] | None,
]
| None
) = None
SENSOR_TYPES: dict[str, LidarrSensorEntityDescription[Any]] = {

View File

@ -1218,9 +1218,9 @@ class LightEntity(ToggleEntity, cached_properties=CACHED_PROPERTIES_WITH_ATTR_):
color_temp_kelvin = self.color_temp_kelvin
data[ATTR_COLOR_TEMP_KELVIN] = color_temp_kelvin
if color_temp_kelvin:
data[
ATTR_COLOR_TEMP
] = color_util.color_temperature_kelvin_to_mired(color_temp_kelvin)
data[ATTR_COLOR_TEMP] = (
color_util.color_temperature_kelvin_to_mired(color_temp_kelvin)
)
else:
data[ATTR_COLOR_TEMP] = None
else:
@ -1233,9 +1233,9 @@ class LightEntity(ToggleEntity, cached_properties=CACHED_PROPERTIES_WITH_ATTR_):
color_temp_kelvin = self.color_temp_kelvin
data[ATTR_COLOR_TEMP_KELVIN] = color_temp_kelvin
if color_temp_kelvin:
data[
ATTR_COLOR_TEMP
] = color_util.color_temperature_kelvin_to_mired(color_temp_kelvin)
data[ATTR_COLOR_TEMP] = (
color_util.color_temperature_kelvin_to_mired(color_temp_kelvin)
)
else:
data[ATTR_COLOR_TEMP] = None
else:

View File

@ -24,9 +24,9 @@ async def async_setup_entry(
async_add_entities([LocativeEntity(device, location, location_name)])
hass.data[LT_DOMAIN]["unsub_device_tracker"][
entry.entry_id
] = async_dispatcher_connect(hass, TRACKER_UPDATE, _receive_data)
hass.data[LT_DOMAIN]["unsub_device_tracker"][entry.entry_id] = (
async_dispatcher_connect(hass, TRACKER_UPDATE, _receive_data)
)
class LocativeEntity(TrackerEntity):

View File

@ -166,9 +166,9 @@ def parse_species(species_data):
species_dict["code"] = species["@SpeciesCode"]
species_dict["quality"] = species["@AirQualityBand"]
species_dict["index"] = species["@AirQualityIndex"]
species_dict[
"summary"
] = f"{species_dict['code']} is {species_dict['quality']}"
species_dict["summary"] = (
f"{species_dict['code']} is {species_dict['quality']}"
)
parsed_species_data.append(species_dict)
quality_list.append(species_dict["quality"])
return parsed_species_data, quality_list

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = MoatBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -602,9 +602,9 @@ async def webhook_register_sensor(
async_dispatcher_send(hass, f"{SIGNAL_SENSOR_UPDATE}-{unique_store_key}", data)
else:
data[CONF_UNIQUE_ID] = unique_store_key
data[
CONF_NAME
] = f"{config_entry.data[ATTR_DEVICE_NAME]} {data[ATTR_SENSOR_NAME]}"
data[CONF_NAME] = (
f"{config_entry.data[ATTR_DEVICE_NAME]} {data[ATTR_SENSOR_NAME]}"
)
register_signal = f"{DOMAIN}_{data[ATTR_SENSOR_TYPE]}_register"
async_dispatcher_send(hass, register_signal, data)

View File

@ -258,7 +258,9 @@ class ModbusHub:
"""Initialize the Modbus hub."""
# generic configuration
self._client: AsyncModbusSerialClient | AsyncModbusTcpClient | AsyncModbusUdpClient | None = None
self._client: (
AsyncModbusSerialClient | AsyncModbusTcpClient | AsyncModbusUdpClient | None
) = None
self._async_cancel_listener: Callable[[], None] | None = None
self._in_error = False
self._lock = asyncio.Lock()

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = MopekaIOTBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -398,9 +398,9 @@ class MQTTOptionsFlowHandler(OptionsFlow):
# build form
fields: OrderedDict[vol.Marker, Any] = OrderedDict()
fields[vol.Optional(CONF_DISCOVERY, default=discovery)] = BOOLEAN_SELECTOR
fields[
vol.Optional(CONF_DISCOVERY_PREFIX, default=discovery_prefix)
] = PUBLISH_TOPIC_SELECTOR
fields[vol.Optional(CONF_DISCOVERY_PREFIX, default=discovery_prefix)] = (
PUBLISH_TOPIC_SELECTOR
)
# Birth message is disabled if CONF_BIRTH_MESSAGE = {}
fields[
@ -421,9 +421,9 @@ class MQTTOptionsFlowHandler(OptionsFlow):
)
] = TEXT_SELECTOR
fields[vol.Optional("birth_qos", default=birth[ATTR_QOS])] = QOS_SELECTOR
fields[
vol.Optional("birth_retain", default=birth[ATTR_RETAIN])
] = BOOLEAN_SELECTOR
fields[vol.Optional("birth_retain", default=birth[ATTR_RETAIN])] = (
BOOLEAN_SELECTOR
)
# Will message is disabled if CONF_WILL_MESSAGE = {}
fields[
@ -444,9 +444,9 @@ class MQTTOptionsFlowHandler(OptionsFlow):
)
] = TEXT_SELECTOR
fields[vol.Optional("will_qos", default=will[ATTR_QOS])] = QOS_SELECTOR
fields[
vol.Optional("will_retain", default=will[ATTR_RETAIN])
] = BOOLEAN_SELECTOR
fields[vol.Optional("will_retain", default=will[ATTR_RETAIN])] = (
BOOLEAN_SELECTOR
)
return self.async_show_form(
step_id="options",

View File

@ -1055,16 +1055,16 @@ class MqttDiscoveryUpdate(Entity):
if self._discovery_data is not None:
discovery_hash: tuple[str, str] = self._discovery_data[ATTR_DISCOVERY_HASH]
if self.registry_entry is not None:
self._registry_hooks[
discovery_hash
] = async_track_entity_registry_updated_event(
self.hass,
self.entity_id,
partial(
async_clear_discovery_topic_if_entity_removed,
self._registry_hooks[discovery_hash] = (
async_track_entity_registry_updated_event(
self.hass,
self._discovery_data,
),
self.entity_id,
partial(
async_clear_discovery_topic_if_entity_removed,
self.hass,
self._discovery_data,
),
)
)
stop_discovery_updates(self.hass, self._discovery_data)
send_discovery_done(self.hass, self._discovery_data)

View File

@ -41,14 +41,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return state
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = update_coordinator.DataUpdateCoordinator(
hass,
logging.getLogger(__name__),
name=DOMAIN,
update_interval=UPDATE_INTERVAL_NOT_IN_MEETING,
update_method=update_data,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
update_coordinator.DataUpdateCoordinator(
hass,
logging.getLogger(__name__),
name=DOMAIN,
update_interval=UPDATE_INTERVAL_NOT_IN_MEETING,
update_method=update_data,
)
)
await coordinator.async_config_entry_first_refresh()

View File

@ -302,9 +302,9 @@ class MySensorsConfigFlowHandler(ConfigFlow, domain=DOMAIN):
except vol.Invalid:
errors[CONF_PERSISTENCE_FILE] = "invalid_persistence_file"
else:
real_persistence_path = user_input[
CONF_PERSISTENCE_FILE
] = self._normalize_persistence_file(user_input[CONF_PERSISTENCE_FILE])
real_persistence_path = user_input[CONF_PERSISTENCE_FILE] = (
self._normalize_persistence_file(user_input[CONF_PERSISTENCE_FILE])
)
for other_entry in self._async_current_entries():
if CONF_PERSISTENCE_FILE not in other_entry.data:
continue

View File

@ -279,10 +279,8 @@ async def _gw_start(
gateway.on_conn_made = gateway_connected
# Don't use hass.async_create_task to avoid holding up setup indefinitely.
hass.data[DOMAIN][
MYSENSORS_GATEWAY_START_TASK.format(entry.entry_id)
] = asyncio.create_task(
gateway.start()
hass.data[DOMAIN][MYSENSORS_GATEWAY_START_TASK.format(entry.entry_id)] = (
asyncio.create_task(gateway.start())
) # store the connect task so it can be cancelled in gw_stop
async def stop_this_gw(_: Event) -> None:

View File

@ -267,9 +267,9 @@ class NetatmoThermostat(NetatmoBaseEntity, ClimateEntity):
"name",
None,
)
self._attr_extra_state_attributes[
ATTR_SELECTED_SCHEDULE
] = self._selected_schedule
self._attr_extra_state_attributes[ATTR_SELECTED_SCHEDULE] = (
self._selected_schedule
)
self.async_write_ha_state()
self.data_handler.async_force_update(self._signal_name)
return
@ -430,14 +430,14 @@ class NetatmoThermostat(NetatmoBaseEntity, ClimateEntity):
self._selected_schedule = getattr(
self._room.home.get_selected_schedule(), "name", None
)
self._attr_extra_state_attributes[
ATTR_SELECTED_SCHEDULE
] = self._selected_schedule
self._attr_extra_state_attributes[ATTR_SELECTED_SCHEDULE] = (
self._selected_schedule
)
if self._model == NA_VALVE:
self._attr_extra_state_attributes[
ATTR_HEATING_POWER_REQUEST
] = self._room.heating_power_request
self._attr_extra_state_attributes[ATTR_HEATING_POWER_REQUEST] = (
self._room.heating_power_request
)
else:
for module in self._room.modules.values():
if hasattr(module, "boiler_status"):

View File

@ -149,13 +149,13 @@ class NetatmoOptionsFlowHandler(OptionsFlow):
async def async_step_public_weather(self, user_input: dict) -> ConfigFlowResult:
"""Manage configuration of Netatmo public weather sensors."""
if user_input is not None and CONF_NEW_AREA not in user_input:
self.options[CONF_WEATHER_AREAS][
user_input[CONF_AREA_NAME]
] = fix_coordinates(user_input)
self.options[CONF_WEATHER_AREAS][user_input[CONF_AREA_NAME]] = (
fix_coordinates(user_input)
)
self.options[CONF_WEATHER_AREAS][user_input[CONF_AREA_NAME]][
CONF_UUID
] = str(uuid.uuid4())
self.options[CONF_WEATHER_AREAS][user_input[CONF_AREA_NAME]][CONF_UUID] = (
str(uuid.uuid4())
)
return await self.async_step_public_weather_areas()

View File

@ -321,9 +321,9 @@ class LeafDataStore:
self.data[DATA_RANGE_AC] = None
if hasattr(server_response, "cruising_range_ac_off_km"):
self.data[
DATA_RANGE_AC_OFF
] = server_response.cruising_range_ac_off_km
self.data[DATA_RANGE_AC_OFF] = (
server_response.cruising_range_ac_off_km
)
else:
self.data[DATA_RANGE_AC_OFF] = None

View File

@ -82,9 +82,9 @@ class NZBGetConfigFlow(ConfigFlow, domain=DOMAIN):
}
if self.show_advanced_options:
data_schema[
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL)
] = bool
data_schema[vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL)] = (
bool
)
return self.async_show_form(
step_id="user",

View File

@ -105,9 +105,9 @@ class ONVIFCameraEntity(ONVIFBaseEntity, Camera):
self.stream_options[CONF_RTSP_TRANSPORT] = device.config_entry.options.get(
CONF_RTSP_TRANSPORT, next(iter(RTSP_TRANSPORTS))
)
self.stream_options[
CONF_USE_WALLCLOCK_AS_TIMESTAMPS
] = device.config_entry.options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS, False)
self.stream_options[CONF_USE_WALLCLOCK_AS_TIMESTAMPS] = (
device.config_entry.options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS, False)
)
self._basic_auth = (
device.config_entry.data.get(CONF_SNAPSHOT_AUTH)
== HTTP_BASIC_AUTHENTICATION

View File

@ -12,9 +12,9 @@ from homeassistant.util.decorator import Registry
from .models import Event
PARSERS: Registry[
str, Callable[[str, Any], Coroutine[Any, Any, Event | None]]
] = Registry()
PARSERS: Registry[str, Callable[[str, Any], Coroutine[Any, Any, Event | None]]] = (
Registry()
)
VIDEO_SOURCE_MAPPING = {
"vsconf": "VideoSourceToken",

View File

@ -65,20 +65,20 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
)
return await data.async_poll(connectable_device)
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = ActiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
needs_poll_method=_needs_poll,
poll_method=_async_poll,
# We will take advertisements from non-connectable devices
# since we will trade the BLEDevice for a connectable one
# if we need to poll it
connectable=False,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
ActiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
needs_poll_method=_needs_poll,
poll_method=_async_poll,
# We will take advertisements from non-connectable devices
# since we will trade the BLEDevice for a connectable one
# if we need to poll it
connectable=False,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -37,9 +37,9 @@ class OverkizNumberDescription(NumberEntityDescription):
min_value_state_name: str | None = None
max_value_state_name: str | None = None
inverted: bool = False
set_native_value: Callable[
[float, Callable[..., Awaitable[None]]], Awaitable[None]
] | None = None
set_native_value: (
Callable[[float, Callable[..., Awaitable[None]]], Awaitable[None]] | None
) = None
async def _async_set_native_value_boost_mode_duration(

View File

@ -186,10 +186,10 @@ class PlexLibrarySectionSensor(SensorEntity):
libtype=primary_libtype, includeCollections=False
)
for libtype in LIBRARY_ATTRIBUTE_TYPES.get(self.library_type, []):
self._attr_extra_state_attributes[
f"{libtype}s"
] = self.library_section.totalViewSize(
libtype=libtype, includeCollections=False
self._attr_extra_state_attributes[f"{libtype}s"] = (
self.library_section.totalViewSize(
libtype=libtype, includeCollections=False
)
)
recent_libtype = LIBRARY_RECENT_LIBTYPE.get(

View File

@ -482,9 +482,9 @@ class PlexServer:
continue
process_device("session", player)
available_clients[player.machineIdentifier][
"session"
] = self.active_sessions[unique_id]
available_clients[player.machineIdentifier]["session"] = (
self.active_sessions[unique_id]
)
for device in devices:
process_device("PMS", device)

View File

@ -51,13 +51,13 @@ class ProgettiHWSWConfigFlow(ConfigFlow, domain=DOMAIN):
relay_modes_schema = {}
for i in range(1, int(self.s1_in["relay_count"]) + 1):
relay_modes_schema[
vol.Required(f"relay_{str(i)}", default="bistable")
] = vol.In(
{
"bistable": "Bistable (ON/OFF Mode)",
"monostable": "Monostable (Timer Mode)",
}
relay_modes_schema[vol.Required(f"relay_{str(i)}", default="bistable")] = (
vol.In(
{
"bistable": "Bistable (ON/OFF Mode)",
"monostable": "Monostable (Timer Mode)",
}
)
)
return self.async_show_form(

View File

@ -294,15 +294,15 @@ class ProximityDataUpdateCoordinator(DataUpdateCoordinator[ProximityData]):
old_lat = None
old_lon = None
entities_data[state_change_data.entity_id][
ATTR_DIR_OF_TRAVEL
] = self._calc_direction_of_travel(
zone_state,
new_state,
old_lat,
old_lon,
new_state.attributes.get(ATTR_LATITUDE),
new_state.attributes.get(ATTR_LONGITUDE),
entities_data[state_change_data.entity_id][ATTR_DIR_OF_TRAVEL] = (
self._calc_direction_of_travel(
zone_state,
new_state,
old_lat,
old_lon,
new_state.attributes.get(ATTR_LATITUDE),
new_state.attributes.get(ATTR_LONGITUDE),
)
)
# takeover data for legacy proximity entity
@ -337,9 +337,9 @@ class ProximityDataUpdateCoordinator(DataUpdateCoordinator[ProximityData]):
if cast(int, nearest_distance_to) == int(distance_to):
_LOGGER.debug("set equally close entity_data: %s", entity_data)
proximity_data[
ATTR_NEAREST
] = f"{proximity_data[ATTR_NEAREST]}, {str(entity_data[ATTR_NAME])}"
proximity_data[ATTR_NEAREST] = (
f"{proximity_data[ATTR_NEAREST]}, {str(entity_data[ATTR_NAME])}"
)
return ProximityData(proximity_data, entities_data)

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = QingpingBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -61,10 +61,13 @@ class RadarrSensorEntityDescription(
):
"""Class to describe a Radarr sensor."""
description_fn: Callable[
[RadarrSensorEntityDescription[T], RootFolder],
tuple[RadarrSensorEntityDescription[T], str] | None,
] | None = None
description_fn: (
Callable[
[RadarrSensorEntityDescription[T], RootFolder],
tuple[RadarrSensorEntityDescription[T], str] | None,
]
| None
) = None
SENSOR_TYPES: dict[str, RadarrSensorEntityDescription[Any]] = {

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = RAPTPillBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.ACTIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.ACTIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -89,9 +89,9 @@ class ReCollectWasteSensor(ReCollectWasteEntity, SensorEntity):
self._attr_native_value = None
else:
self._attr_extra_state_attributes[ATTR_AREA_NAME] = event.area_name
self._attr_extra_state_attributes[
ATTR_PICKUP_TYPES
] = async_get_pickup_type_names(self._entry, event.pickup_types)
self._attr_extra_state_attributes[ATTR_PICKUP_TYPES] = (
async_get_pickup_type_names(self._entry, event.pickup_types)
)
self._attr_native_value = event.date
super()._handle_coordinator_update()

View File

@ -878,9 +878,10 @@ def _apply_update( # noqa: C901
if engine.dialect.name == SupportedDialect.MYSQL:
# Ensure the row format is dynamic or the index
# unique will be too large
with contextlib.suppress(SQLAlchemyError), session_scope(
session=session_maker()
) as session:
with (
contextlib.suppress(SQLAlchemyError),
session_scope(session=session_maker()) as session,
):
connection = session.connection()
# This is safe to run multiple times and fast
# since the table is small.
@ -1132,9 +1133,10 @@ def _correct_table_character_set_and_collation(
"computers. Please be patient!",
table,
)
with contextlib.suppress(SQLAlchemyError), session_scope(
session=session_maker()
) as session:
with (
contextlib.suppress(SQLAlchemyError),
session_scope(session=session_maker()) as session,
):
connection = session.connection()
connection.execute(
# Using LOCK=EXCLUSIVE to prevent the database from corrupting
@ -1579,9 +1581,9 @@ def migrate_event_type_ids(instance: Recorder) -> bool:
assert (
db_event_type.event_type is not None
), "event_type should never be None"
event_type_to_id[
db_event_type.event_type
] = db_event_type.event_type_id
event_type_to_id[db_event_type.event_type] = (
db_event_type.event_type_id
)
event_type_manager.clear_non_existent(db_event_type.event_type)
session.execute(
@ -1652,9 +1654,9 @@ def migrate_entity_ids(instance: Recorder) -> bool:
assert (
db_states_metadata.entity_id is not None
), "entity_id should never be None"
entity_id_to_metadata_id[
db_states_metadata.entity_id
] = db_states_metadata.metadata_id
entity_id_to_metadata_id[db_states_metadata.entity_id] = (
db_states_metadata.metadata_id
)
session.execute(
update(States),

View File

@ -16,13 +16,11 @@ EMPTY_JSON_OBJECT = "{}"
@overload
def process_timestamp(ts: None) -> None:
...
def process_timestamp(ts: None) -> None: ...
@overload
def process_timestamp(ts: datetime) -> datetime:
...
def process_timestamp(ts: datetime) -> datetime: ...
def process_timestamp(ts: datetime | None) -> datetime | None:
@ -36,13 +34,11 @@ def process_timestamp(ts: datetime | None) -> datetime | None:
@overload
def process_timestamp_to_utc_isoformat(ts: None) -> None:
...
def process_timestamp_to_utc_isoformat(ts: None) -> None: ...
@overload
def process_timestamp_to_utc_isoformat(ts: datetime) -> str:
...
def process_timestamp_to_utc_isoformat(ts: datetime) -> str: ...
def process_timestamp_to_utc_isoformat(ts: datetime | None) -> str | None:

View File

@ -239,9 +239,9 @@ class PublicTransportData:
}
if real_time_date is not None and real_time_time is not None:
departure_data[
ATTR_REAL_TIME_AT
] = f"{real_time_date} {real_time_time}"
departure_data[ATTR_REAL_TIME_AT] = (
f"{real_time_date} {real_time_time}"
)
if item.get("rtTrack") is not None:
departure_data[ATTR_TRACK] = item.get("rtTrack")

View File

@ -193,9 +193,9 @@ class ReolinkFlowHandler(ConfigFlow, domain=DOMAIN):
errors[CONF_HOST] = "api_error"
except ReolinkWebhookException as err:
placeholders["error"] = str(err)
placeholders[
"more_info"
] = "https://www.home-assistant.io/more-info/no-url-available/#configuring-the-instance-url"
placeholders["more_info"] = (
"https://www.home-assistant.io/more-info/no-url-available/#configuring-the-instance-url"
)
errors["base"] = "webhook_exception"
except (ReolinkError, ReolinkException) as err:
placeholders["error"] = str(err)

View File

@ -1,6 +1,5 @@
"""Base entity for ROMY."""
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.update_coordinator import CoordinatorEntity

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = RuuvitagBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.ACTIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.ACTIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -68,7 +68,7 @@ class SABnzbdConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_import(self, import_data):
"""Import sabnzbd config from configuration.yaml."""
protocol = "https://" if import_data[CONF_SSL] else "http://"
import_data[
CONF_URL
] = f"{protocol}{import_data[CONF_HOST]}:{import_data[CONF_PORT]}"
import_data[CONF_URL] = (
f"{protocol}{import_data[CONF_HOST]}:{import_data[CONF_PORT]}"
)
return await self.async_step_user(import_data)

View File

@ -184,13 +184,13 @@ class SamsungTVConfigFlow(ConfigFlow, domain=DOMAIN):
if self._model:
updates[CONF_MODEL] = self._model
if self._ssdp_rendering_control_location:
updates[
CONF_SSDP_RENDERING_CONTROL_LOCATION
] = self._ssdp_rendering_control_location
updates[CONF_SSDP_RENDERING_CONTROL_LOCATION] = (
self._ssdp_rendering_control_location
)
if self._ssdp_main_tv_agent_location:
updates[
CONF_SSDP_MAIN_TV_AGENT_LOCATION
] = self._ssdp_main_tv_agent_location
updates[CONF_SSDP_MAIN_TV_AGENT_LOCATION] = (
self._ssdp_main_tv_agent_location
)
self._abort_if_unique_id_configured(updates=updates, reload_on_update=False)
async def _async_create_bridge(self) -> None:
@ -388,13 +388,13 @@ class SamsungTVConfigFlow(ConfigFlow, domain=DOMAIN):
or update_model
):
if update_ssdp_rendering_control_location:
data[
CONF_SSDP_RENDERING_CONTROL_LOCATION
] = self._ssdp_rendering_control_location
data[CONF_SSDP_RENDERING_CONTROL_LOCATION] = (
self._ssdp_rendering_control_location
)
if update_ssdp_main_tv_agent_location:
data[
CONF_SSDP_MAIN_TV_AGENT_LOCATION
] = self._ssdp_main_tv_agent_location
data[CONF_SSDP_MAIN_TV_AGENT_LOCATION] = (
self._ssdp_main_tv_agent_location
)
if update_mac:
data[CONF_MAC] = self._mac
if update_model:

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = SensirionBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.ACTIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.ACTIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -872,9 +872,9 @@ class SensorExtraStoredData(ExtraStoredData):
def as_dict(self) -> dict[str, Any]:
"""Return a dict representation of the sensor data."""
native_value: StateType | date | datetime | Decimal | dict[
str, str
] = self.native_value
native_value: StateType | date | datetime | Decimal | dict[str, str] = (
self.native_value
)
if isinstance(native_value, (date, datetime)):
native_value = {
"__type": str(type(native_value)),

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = SensorProBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

View File

@ -26,14 +26,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
address = entry.unique_id
assert address is not None
data = SensorPushBluetoothDeviceData()
coordinator = hass.data.setdefault(DOMAIN, {})[
entry.entry_id
] = PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
coordinator = hass.data.setdefault(DOMAIN, {})[entry.entry_id] = (
PassiveBluetoothProcessorCoordinator(
hass,
_LOGGER,
address=address,
mode=BluetoothScanningMode.PASSIVE,
update_method=data.update,
)
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(

Some files were not shown because too many files have changed in this diff Show More