ESPHome/esphome/coroutine.py

253 lines
9.1 KiB
Python

"""
ESPHome's coroutine system.
The Problem: When running the code generationg, components can depend on variables being registered.
For example, an i2c-based sensor would need the i2c bus component to first be declared before the
codegen can emit code using that variable (or otherwise the C++ won't compile).
ESPHome's codegen system solves this by using coroutine-like methods. When a component depends on
a variable, it waits for it to be registered using `await cg.get_variable()`. If the variable
hasn't been registered yet, control will be yielded back to another component until the variable
is registered. This leads to a topological sort, solving the dependency problem.
Importantly, ESPHome only uses the coroutine *syntax*, no actual asyncio event loop is running in
the background. This is so that we can ensure the order of execution is constant for the same
YAML configuration, thus main.cpp only has to be recompiled if the configuration actually changes.
There are two syntaxes for ESPHome coroutines ("old style" vs "new style" coroutines).
"new style" - This is very much like coroutines you might be used to:
```py
async def my_coroutine(config):
var = await cg.get_variable(config[CONF_ID])
await some_other_coroutine(xyz)
return var
```
new style coroutines are `async def` methods that use `await` to await the result of another coroutine,
and can return values using a `return` statement.
"old style" - This was a hack for when ESPHome still had to run on python 2, but is still compatible
```py
@coroutine
def my_coroutine(config):
var = yield cg.get_variable(config[CONF_ID])
yield some_other_coroutine(xyz)
yield var
```
Here everything is combined in `yield` expressions. You await other coroutines using `yield` and
the last `yield` expression defines what is returned.
"""
import collections
import functools
import heapq
import inspect
import logging
import types
from typing import Any, Awaitable, Callable, Generator, Iterator, List, Tuple
_LOGGER = logging.getLogger(__name__)
def coroutine(func: Callable[..., Any]) -> Callable[..., Awaitable[Any]]:
"""Decorator to apply to methods to convert them to ESPHome coroutines."""
if getattr(func, "_esphome_coroutine", False):
# If func is already a coroutine, do not re-wrap it (performance)
return func
if inspect.isasyncgenfunction(func):
# Trade-off: In ESPHome, there's not really a use-case for async generators.
# and during the transition to new-style syntax it will happen that a `yield`
# is not replaced properly, so don't accept async generators.
raise ValueError(
f"Async generator functions are not allowed. "
f"Please check whether you've replaced all yields with awaits/returns. "
f"See {func} in {func.__module__}"
)
if inspect.iscoroutinefunction(func):
# A new-style async-def coroutine function, no conversion needed.
return func
if inspect.isgeneratorfunction(func):
@functools.wraps(func)
def coro(*args, **kwargs):
gen = func(*args, **kwargs)
ret = yield from _flatten_generator(gen)
return ret
else:
# A "normal" function with no `yield` statements, convert to generator
# that includes a yield just so it's also a generator function
@functools.wraps(func)
def coro(*args, **kwargs):
res = func(*args, **kwargs)
yield
return res
# Add coroutine internal python flag so that it can be awaited from new-style coroutines.
coro = types.coroutine(coro)
# pylint: disable=protected-access
coro._esphome_coroutine = True
return coro
def coroutine_with_priority(priority: float):
"""Decorator to apply to functions to convert them to ESPHome coroutines.
:param priority: priority with which to schedule the coroutine, higher priorities run first.
"""
def decorator(func):
coro = coroutine(func)
coro.priority = priority
return coro
return decorator
def _flatten_generator(gen: Generator[Any, Any, Any]):
to_send = None
while True:
try:
# Run until next yield expression
val = gen.send(to_send)
except StopIteration as e:
# return statement or end of function
# From py3.3, return with a value is allowed in generators,
# and return value is transported in the value field of the exception.
# If we find a value in the exception, use that as the return value,
# otherwise use the value from the last yield statement ("old style")
ret = to_send if e.value is None else e.value
return ret
if isinstance(val, collections.abc.Awaitable):
# yielded object that is awaitable (like `yield some_new_style_method()`)
# yield from __await__() like actual coroutines would.
to_send = yield from val.__await__()
elif inspect.isgenerator(val):
# Old style, like `yield cg.get_variable()`
to_send = yield from _flatten_generator(val)
else:
# Could be the last expression from this generator, record this as the return value
to_send = val
# perform a yield so that expressions like `while some_condition(): yield None`
# do not run without yielding control back to the top
yield
class FakeAwaitable:
"""Convert a generator to an awaitable object.
Needed for internals of `cg.get_variable`. There we can't use @coroutine because
native coroutines await from types.coroutine() directly without yielding back control to the top
(likely as a performance enhancement).
If we instead wrap the generator in this FakeAwaitable, control is yielded back to the top
(reason unknown).
"""
def __init__(self, gen: Generator[Any, Any, Any]) -> None:
self._gen = gen
def __await__(self):
ret = yield from self._gen
return ret
@functools.total_ordering
class _Task:
def __init__(
self,
priority: float,
id_number: int,
iterator: Iterator[None],
original_function: Any,
):
self.priority = priority
self.id_number = id_number
self.iterator = iterator
self.original_function = original_function
def with_priority(self, priority: float) -> "_Task":
return _Task(priority, self.id_number, self.iterator, self.original_function)
@property
def _cmp_tuple(self) -> Tuple[float, int]:
return (-self.priority, self.id_number)
def __eq__(self, other):
return self._cmp_tuple == other._cmp_tuple
def __ne__(self, other):
return not (self == other)
def __lt__(self, other):
return self._cmp_tuple < other._cmp_tuple
class FakeEventLoop:
"""Emulate an asyncio EventLoop to run some registered coroutine jobs in sequence."""
def __init__(self):
self._pending_tasks: List[_Task] = []
self._task_counter = 0
def add_job(self, func, *args, **kwargs):
"""Add a job to the task queue,
Optionally retrieves priority from the function object, and schedules according to that.
"""
if inspect.iscoroutine(func):
raise ValueError("Can only add coroutine functions, not coroutine objects")
if inspect.iscoroutinefunction(func):
coro = func
gen = coro(*args, **kwargs).__await__()
else:
coro = coroutine(func)
gen = coro(*args, **kwargs)
prio = getattr(coro, "priority", 0.0)
task = _Task(prio, self._task_counter, gen, func)
self._task_counter += 1
heapq.heappush(self._pending_tasks, task)
def flush_tasks(self):
"""Run until all tasks have been completed.
:raises RuntimeError: if a deadlock is detected.
"""
i = 0
while self._pending_tasks:
i += 1
if i > 1000000:
# Detect deadlock/circular dependency by measuring how many times tasks have been
# executed. On the big tests/test1.yaml we only get to a fraction of this, so
# this shouldn't be a problem.
raise RuntimeError(
"Circular dependency detected! "
"Please run with -v option to see what functions failed to "
"complete."
)
task: _Task = heapq.heappop(self._pending_tasks)
_LOGGER.debug(
"Running %s in %s (num %s)",
task.original_function.__qualname__,
task.original_function.__module__,
task.id_number,
)
try:
next(task.iterator)
# Decrease priority over time, so that if this task is blocked
# due to a dependency others will clear the dependency
# This could be improved with a less naive approach
new_task = task.with_priority(task.priority - 1)
heapq.heappush(self._pending_tasks, new_task)
except StopIteration:
_LOGGER.debug(" -> finished")