1
0
mirror of https://github.com/esphome/esphome.git synced 2024-12-02 13:58:48 +01:00
esphome/esphome/config.py
2024-07-29 14:07:44 +12:00

1125 lines
39 KiB
Python

from __future__ import annotations
import abc
from contextlib import contextmanager
import contextvars
import functools
import heapq
import logging
import re
from typing import Any, Union
import voluptuous as vol
from esphome import core, loader, pins, yaml_util
from esphome.config_helpers import Extend, Remove
import esphome.config_validation as cv
from esphome.const import (
CONF_ESPHOME,
CONF_EXTERNAL_COMPONENTS,
CONF_ID,
CONF_PACKAGES,
CONF_PLATFORM,
CONF_SUBSTITUTIONS,
TARGET_PLATFORMS,
)
from esphome.core import CORE, DocumentRange, EsphomeError
import esphome.core.config as core_config
import esphome.final_validate as fv
from esphome.helpers import indent
from esphome.loader import ComponentManifest, get_component, get_platform
from esphome.log import Fore, color
from esphome.types import ConfigFragmentType, ConfigType
from esphome.util import OrderedDict, safe_print
from esphome.voluptuous_schema import ExtraKeysInvalid
from esphome.yaml_util import ESPForceValue, ESPHomeDataBase, is_secret
_LOGGER = logging.getLogger(__name__)
def iter_components(config):
for domain, conf in config.items():
component = get_component(domain)
yield domain, component
if component.is_platform_component:
for p_config in conf:
p_name = f"{domain}.{p_config[CONF_PLATFORM]}"
platform = get_platform(domain, p_config[CONF_PLATFORM])
yield p_name, platform
def iter_component_configs(config):
for domain, conf in config.items():
component = get_component(domain)
if component.multi_conf:
for conf_ in conf:
yield domain, component, conf_
else:
yield domain, component, conf
if component.is_platform_component:
for p_config in conf:
p_name = f"{domain}.{p_config[CONF_PLATFORM]}"
platform = get_platform(domain, p_config[CONF_PLATFORM])
yield p_name, platform, p_config
ConfigPath = list[Union[str, int]]
path_context = contextvars.ContextVar("Config path")
def _path_begins_with(path: ConfigPath, other: ConfigPath) -> bool:
if len(path) < len(other):
return False
return path[: len(other)] == other
@functools.total_ordering
class _ValidationStepTask:
def __init__(self, priority: float, id_number: int, step: ConfigValidationStep):
self.priority = priority
self.id_number = id_number
self.step = step
@property
def _cmp_tuple(self) -> tuple[float, int]:
return (-self.priority, self.id_number)
def __eq__(self, other):
return self._cmp_tuple == other._cmp_tuple
def __ne__(self, other):
return not (self == other)
def __lt__(self, other):
return self._cmp_tuple < other._cmp_tuple
class Config(OrderedDict, fv.FinalValidateConfig):
def __init__(self):
super().__init__()
# A list of voluptuous errors
self.errors: list[vol.Invalid] = []
# A list of paths that should be fully outputted
# The values will be the paths to all "domain", for example (['logger'], 'logger')
# or (['sensor', 'ultrasonic'], 'sensor.ultrasonic')
self.output_paths: list[tuple[ConfigPath, str]] = []
# A list of components ids with the config path
self.declare_ids: list[tuple[core.ID, ConfigPath]] = []
self._data = {}
# Store pending validation tasks (in heap order)
self._validation_tasks: list[_ValidationStepTask] = []
# ID to ensure stable order for keys with equal priority
self._validation_tasks_id = 0
def add_error(self, error: vol.Invalid) -> None:
if isinstance(error, vol.MultipleInvalid):
for err in error.errors:
self.add_error(err)
return
if cv.ROOT_CONFIG_PATH in error.path:
# Root value means that the path before the root should be ignored
last_root = max(
i for i, v in enumerate(error.path) if v is cv.ROOT_CONFIG_PATH
)
# can't change the path so re-create the error
error = vol.Invalid(
message=error.error_message,
path=error.path[last_root + 1 :],
error_type=error.error_type,
)
self.errors.append(error)
def add_validation_step(self, step: ConfigValidationStep):
id_num = self._validation_tasks_id
self._validation_tasks_id += 1
heapq.heappush(
self._validation_tasks, _ValidationStepTask(step.priority, id_num, step)
)
def run_validation_steps(self):
while self._validation_tasks and not self.errors:
task = heapq.heappop(self._validation_tasks)
task.step.run(self)
@contextmanager
def catch_error(self, path=None):
path = path or []
try:
yield
except cv.FinalExternalInvalid as e:
self.add_error(e)
except vol.Invalid as e:
e.prepend(path)
self.add_error(e)
def add_str_error(self, message: str, path: ConfigPath) -> None:
self.add_error(vol.Invalid(message, path))
def add_output_path(self, path: ConfigPath, domain: str) -> None:
self.output_paths.append((path, domain))
def remove_output_path(self, path: ConfigPath, domain: str) -> None:
self.output_paths.remove((path, domain))
def is_in_error_path(self, path: ConfigPath) -> bool:
for err in self.errors:
if _path_begins_with(err.path, path):
return True
return False
def set_by_path(self, path, value):
conf = self
for key in path[:-1]:
conf = conf[key]
conf[path[-1]] = value
def get_error_for_path(self, path: ConfigPath) -> vol.Invalid | None:
for err in self.errors:
if self.get_deepest_path(err.path) == path:
self.errors.remove(err)
return err
return None
def get_deepest_document_range_for_path(
self, path: ConfigPath, get_key: bool = False
) -> DocumentRange | None:
data = self
doc_range = None
for index, path_item in enumerate(path):
try:
if path_item in data:
key_data = [x for x in data.keys() if x == path_item][0]
if isinstance(key_data, ESPHomeDataBase):
doc_range = key_data.esp_range
if get_key and index == len(path) - 1:
return doc_range
data = data[path_item]
except (KeyError, IndexError, TypeError, AttributeError):
return doc_range
if isinstance(data, core.ID):
data = data.id
if isinstance(data, ESPHomeDataBase) and data.esp_range is not None:
doc_range = data.esp_range
elif isinstance(data, dict):
platform_item = data.get("platform")
if (
isinstance(platform_item, ESPHomeDataBase)
and platform_item.esp_range is not None
):
doc_range = platform_item.esp_range
return doc_range
def get_nested_item(
self, path: ConfigPath, raise_error: bool = False
) -> ConfigFragmentType:
data = self
for item_index in path:
try:
data = data[item_index]
except (KeyError, IndexError, TypeError):
if raise_error:
raise
return {}
return data
def get_deepest_path(self, path: ConfigPath) -> ConfigPath:
"""Return the path that is the deepest reachable by following path."""
data = self
part = []
for item_index in path:
try:
data = data[item_index]
except (KeyError, IndexError, TypeError):
return part
part.append(item_index)
return part
def get_path_for_id(self, id: core.ID):
"""Return the config fragment where the given ID is declared."""
for declared_id, path in self.declare_ids:
if declared_id.id == str(id):
return path
raise KeyError(f"ID {id} not found in configuration")
def get_config_for_path(self, path: ConfigPath) -> ConfigFragmentType:
return self.get_nested_item(path, raise_error=True)
@property
def data(self):
"""Return temporary data used by final validation functions."""
return self._data
def iter_ids(config, path=None):
path = path or []
if isinstance(config, core.ID):
yield config, path
elif isinstance(config, core.Lambda):
for id in config.requires_ids:
yield id, path
elif isinstance(config, list):
for i, item in enumerate(config):
yield from iter_ids(item, path + [i])
elif isinstance(config, dict):
for key, value in config.items():
if isinstance(key, core.ID):
yield key, path
yield from iter_ids(value, path + [key])
def recursive_check_replaceme(value):
if isinstance(value, list):
return cv.Schema([recursive_check_replaceme])(value)
if isinstance(value, dict):
return cv.Schema({cv.valid: recursive_check_replaceme})(value)
if isinstance(value, ESPForceValue):
pass
if isinstance(value, str) and value == "REPLACEME":
raise cv.Invalid(
"Found 'REPLACEME' in configuration, this is most likely an error. "
"Please make sure you have replaced all fields from the sample "
"configuration.\n"
"If you want to use the literal REPLACEME string, "
'please use "!force REPLACEME"'
)
return value
class ConfigValidationStep(abc.ABC):
"""A step to for the validation phase."""
# Priority of this step, higher means run earlier
priority: float = 0.0
@abc.abstractmethod
def run(self, result: Config) -> None: ... # noqa: E704
class LoadValidationStep(ConfigValidationStep):
"""Load step, this step is called once for each domain config fragment.
Responsibilities:
- Load component code
- Ensure all AUTO_LOADs are added
- Set output paths of result
"""
def __init__(self, domain: str, conf: ConfigType):
self.domain = domain
self.conf = conf
def run(self, result: Config) -> None:
if self.domain.startswith("."):
# Ignore top-level keys starting with a dot
return
result.add_output_path([self.domain], self.domain)
component = get_component(self.domain)
if (
component is not None
and component.multi_conf_no_default
and isinstance(self.conf, core.AutoLoad)
):
self.conf = []
result[self.domain] = self.conf
path = [self.domain]
if component is None:
result.add_str_error(f"Component not found: {self.domain}", path)
return
CORE.loaded_integrations.add(self.domain)
# Process AUTO_LOAD
for load in component.auto_load:
if load not in result:
result.add_validation_step(AutoLoadValidationStep(load))
result.add_validation_step(
MetadataValidationStep([self.domain], self.domain, self.conf, component)
)
if not component.is_platform_component:
return
# This is a platform component, proceed to reading platform entries
# Remove this is as an output path
result.remove_output_path([self.domain], self.domain)
# Ensure conf is a list
if not self.conf:
result[self.domain] = self.conf = []
elif not isinstance(self.conf, list):
result[self.domain] = self.conf = [self.conf]
for i, p_config in enumerate(self.conf):
path = [self.domain, i]
# Construct temporary unknown output path
p_domain = f"{self.domain}.unknown"
result.add_output_path(path, p_domain)
result[self.domain][i] = p_config
if not isinstance(p_config, dict):
result.add_str_error("Platform schemas must be key-value pairs.", path)
continue
p_name = p_config.get("platform")
if p_name is None:
p_id = p_config.get(CONF_ID)
if isinstance(p_id, Extend):
result.add_str_error(
f"Source for extension of ID '{p_id.value}' was not found.",
path + [CONF_ID],
)
continue
if isinstance(p_id, Remove):
result.add_str_error(
f"Source for removal of ID '{p_id.value}' was not found.",
path + [CONF_ID],
)
continue
result.add_str_error(
f"'{self.domain}' requires a 'platform' key but it was not specified.",
path,
)
continue
# Remove temp output path and construct new one
result.remove_output_path(path, p_domain)
p_domain = f"{self.domain}.{p_name}"
result.add_output_path(path, p_domain)
# Try Load platform
platform = get_platform(self.domain, p_name)
if platform is None:
result.add_str_error(f"Platform not found: '{p_domain}'", path)
continue
CORE.loaded_integrations.add(p_name)
# Process AUTO_LOAD
for load in platform.auto_load:
if load not in result:
result.add_validation_step(AutoLoadValidationStep(load))
result.add_validation_step(
MetadataValidationStep(path, p_domain, p_config, platform)
)
class AutoLoadValidationStep(ConfigValidationStep):
"""Auto load step. This step is used to automatically load components if
a component requested that with AUTO_LOAD.
"""
# Only load after all regular loads have taken place
priority = -1.0
def __init__(self, domain: str):
self.domain = domain
def run(self, result: Config) -> None:
if self.domain in result:
# already loaded
return
result.add_validation_step(LoadValidationStep(self.domain, core.AutoLoad()))
class MetadataValidationStep(ConfigValidationStep):
"""Validate component metadata
Responsibilties:
- Config transformation (nullable, multi conf)
- Check dependencies
- Check conflicts
- Check supported target platforms
"""
# All components need to be loaded first to ensure dependency check works
priority = -2.0
def __init__(
self,
path: ConfigPath,
domain: str,
conf: ConfigType,
component: ComponentManifest,
) -> None:
self.path = path
self.domain = domain
self.conf = conf
self.comp = component
def run(self, result: Config) -> None:
if self.conf is None:
if self.comp.multi_conf and self.comp.multi_conf_no_default:
result[self.domain] = self.conf = []
else:
result[self.domain] = self.conf = {}
success = True
for dependency in self.comp.dependencies:
dependency_parts = dependency.split(".")
if len(dependency_parts) > 2:
result.add_str_error(
"Dependencies must be specified as a single component or in component.platform format only",
self.path,
)
return
component_dep = dependency_parts[0]
platform_dep = dependency_parts[-1]
if component_dep not in result:
result.add_str_error(
f"Component {self.domain} requires component {component_dep}",
self.path,
)
success = False
elif component_dep != platform_dep and (
not isinstance(platform_list := result.get(component_dep), list)
or not any(CONF_PLATFORM in p for p in platform_list)
or not any(p[CONF_PLATFORM] == platform_dep for p in platform_list)
):
result.add_str_error(
f"Component {self.domain} requires 'platform: {platform_dep}' in component '{component_dep}'",
self.path,
)
success = False
if not success:
return
success = True
for conflict in self.comp.conflicts_with:
if conflict in result:
result.add_str_error(
f"Component {self.domain} cannot be used together with component {conflict}",
self.path,
)
success = False
if not success:
return
if (
not self.comp.is_platform_component
and self.comp.config_schema is None
and not isinstance(self.conf, core.AutoLoad)
):
result.add_str_error(
f"Component {self.domain} cannot be loaded via YAML "
"(no CONFIG_SCHEMA).",
self.path,
)
return
if self.comp.multi_conf:
if not isinstance(self.conf, list):
result[self.domain] = self.conf = [self.conf]
if (
not isinstance(self.comp.multi_conf, bool)
and len(self.conf) > self.comp.multi_conf
):
result.add_str_error(
f"Component {self.domain} supports a maximum of {self.comp.multi_conf} "
f"entries ({len(self.conf)} found).",
self.path,
)
return
for i, part_conf in enumerate(self.conf):
result.add_validation_step(
SchemaValidationStep(
self.domain, self.path + [i], part_conf, self.comp
)
)
return
result.add_validation_step(
SchemaValidationStep(self.domain, self.path, self.conf, self.comp)
)
class SchemaValidationStep(ConfigValidationStep):
"""Schema validation step.
During this step all CONFIG_SCHEMAs are checked against the configs.
"""
def __init__(
self, domain: str, path: ConfigPath, conf: ConfigType, comp: ComponentManifest
):
self.path = path
self.conf = conf
self.comp = comp
def run(self, result: Config) -> None:
token = path_context.set(self.path)
with result.catch_error(self.path):
if self.comp.is_platform:
# Remove 'platform' key for validation
input_conf = OrderedDict(self.conf)
platform_val = input_conf.pop("platform")
schema = cv.Schema(self.comp.config_schema)
validated = schema(input_conf)
# Ensure result is OrderedDict so we can call move_to_end
if not isinstance(validated, OrderedDict):
validated = OrderedDict(validated)
validated["platform"] = platform_val
validated.move_to_end("platform", last=False)
result.set_by_path(self.path, validated)
elif self.comp.config_schema is not None:
schema = cv.Schema(self.comp.config_schema)
validated = schema(self.conf)
result.set_by_path(self.path, validated)
path_context.reset(token)
result.add_validation_step(FinalValidateValidationStep(self.path, self.comp))
class IDPassValidationStep(ConfigValidationStep):
"""ID Pass step.
During this step all ID references are checked.
If an automatic ID reference is used, a fitting declared ID is automatically searched.
Also checks duplicate ID names, and that referenced IDs are declared.
"""
# Has to happen after all schemas validated
priority = -10.0
def __init__(self) -> None:
pass
def run(self, result: Config) -> None:
from esphome.cpp_generator import MockObjClass
from esphome.cpp_types import Component
if result.errors:
# If result already has errors, skip this step
# Otherwise the user will get a bunch of missing ID warnings
# because the component that did not validate doesn't have any IDs set
return
searching_ids: list[tuple[core.ID, ConfigPath]] = []
for id, path in iter_ids(result):
if id.is_declaration:
if id.id is not None:
# Look for duplicate definitions
match = next(
(v for v in result.declare_ids if v[0].id == id.id), None
)
if match is not None:
opath = "->".join(str(v) for v in match[1])
result.add_str_error(
f"ID {id.id} redefined! Check {opath}", path
)
continue
result.declare_ids.append((id, path))
else:
searching_ids.append((id, path))
# Resolve default ids after manual IDs
for id, _ in result.declare_ids:
id.resolve([v[0].id for v in result.declare_ids])
if isinstance(id.type, MockObjClass) and id.type.inherits_from(Component):
CORE.component_ids.add(id.id)
# Check searched IDs
for id, path in searching_ids:
if id.id is not None:
# manually declared
match = next(
(v[0] for v in result.declare_ids if v[0].id == id.id), None
)
if match is None or not match.is_manual:
# No declared ID with this name
import difflib
error = (
f"Couldn't find ID '{id.id}'. Please check you have defined "
"an ID with that name in your configuration."
)
# Find candidates
matches = difflib.get_close_matches(
id.id, [v[0].id for v in result.declare_ids if v[0].is_manual]
)
if matches:
matches_s = ", ".join(f'"{x}"' for x in matches)
error += f" These IDs look similar: {matches_s}."
result.add_str_error(error, path)
continue
if not isinstance(match.type, MockObjClass) or not isinstance(
id.type, MockObjClass
):
continue
if not match.type.inherits_from(id.type):
result.add_str_error(
f"ID '{id.id}' of type {match.type} doesn't inherit from {id.type}. "
"Please double check your ID is pointing to the correct value",
path,
)
if id.id is None and id.type is not None:
matches = []
for v in result.declare_ids:
if v[0] is None or not isinstance(v[0].type, MockObjClass):
continue
inherits = v[0].type.inherits_from(id.type)
if inherits:
matches.append(v[0])
if len(matches) == 0:
result.add_str_error(
f"Couldn't find any component that can be used for '{id.type}'. Are you missing a hub declaration?",
path,
)
elif len(matches) == 1:
id.id = matches[0].id
elif len(matches) > 1:
if str(id.type) == "time::RealTimeClock":
id.id = matches[0].id
else:
manual_declared_count = sum(1 for m in matches if m.is_manual)
if manual_declared_count > 0:
ids = ", ".join(
[f"'{m.id}'" for m in matches if m.is_manual]
)
result.add_str_error(
f"Too many candidates found for '{path[-1]}' type '{id.type}' {'Some are' if manual_declared_count > 1 else 'One is'} {ids}",
path,
)
else:
result.add_str_error(
f"Too many candidates found for '{path[-1]}' type '{id.type}' You must assign an explicit ID to the parent component you want to use.",
path,
)
class RemoveReferenceValidationStep(ConfigValidationStep):
"""
Make sure all !remove references have been removed from the config.
Any left overs mean the merge step couldn't find corresponding previously existing id/key
"""
def run(self, result: Config) -> None:
if result.errors:
# If result already has errors, skip this step
return
def recursive_check_remove_tag(config: Config, path: ConfigPath = None):
path = path or []
if isinstance(config, Remove):
result.add_str_error(
f"Source for removal at '{'->'.join([str(p) for p in path])}' was not found.",
path,
)
elif isinstance(config, list):
for i, item in enumerate(config):
recursive_check_remove_tag(item, path + [i])
elif isinstance(config, dict):
for key, value in config.items():
recursive_check_remove_tag(value, path + [key])
recursive_check_remove_tag(result)
class FinalValidateValidationStep(ConfigValidationStep):
"""Run final_validate_schema for all components."""
# Has to happen after ID pass validated
priority = -20.0
def __init__(self, path: ConfigPath, comp: ComponentManifest) -> None:
self.path = path
self.comp = comp
def run(self, result: Config) -> None:
if result.errors:
# If result already has errors, skip this step
return
token = fv.full_config.set(result)
conf = result.get_nested_item(self.path)
with result.catch_error(self.path):
if self.comp.final_validate_schema is not None:
self.comp.final_validate_schema(conf)
fv.full_config.reset(token)
class PinUseValidationCheck(ConfigValidationStep):
"""Check for pin reuse"""
priority = -30 # Should happen after component final validations
def __init__(self) -> None:
pass
def run(self, result: Config) -> None:
if result.errors:
# If result already has errors, skip this step
return
pins.PIN_SCHEMA_REGISTRY.final_validate(result)
def validate_config(
config: dict[str, Any], command_line_substitutions: dict[str, Any]
) -> Config:
result = Config()
loader.clear_component_meta_finders()
loader.install_custom_components_meta_finder()
# 0. Load packages
if CONF_PACKAGES in config:
from esphome.components.packages import do_packages_pass
result.add_output_path([CONF_PACKAGES], CONF_PACKAGES)
try:
config = do_packages_pass(config)
except vol.Invalid as err:
result.update(config)
result.add_error(err)
return result
CORE.raw_config = config
# 1. Load substitutions
if CONF_SUBSTITUTIONS in config or command_line_substitutions:
from esphome.components import substitutions
result[CONF_SUBSTITUTIONS] = {
**config.get(CONF_SUBSTITUTIONS, {}),
**command_line_substitutions,
}
result.add_output_path([CONF_SUBSTITUTIONS], CONF_SUBSTITUTIONS)
try:
substitutions.do_substitution_pass(config, command_line_substitutions)
substitutions.do_substitution_pass(config, command_line_substitutions)
except vol.Invalid as err:
result.add_error(err)
return result
CORE.raw_config = config
# 1.1. Check for REPLACEME special value
try:
recursive_check_replaceme(config)
except vol.Invalid as err:
result.add_error(err)
# 1.2. Load external_components
if CONF_EXTERNAL_COMPONENTS in config:
from esphome.components.external_components import do_external_components_pass
result.add_output_path([CONF_EXTERNAL_COMPONENTS], CONF_EXTERNAL_COMPONENTS)
try:
do_external_components_pass(config)
except vol.Invalid as err:
result.update(config)
result.add_error(err)
return result
if "esphomeyaml" in config:
_LOGGER.warning(
"The esphomeyaml section has been renamed to esphome in 1.11.0. "
"Please replace 'esphomeyaml:' in your configuration with 'esphome:'."
)
config[CONF_ESPHOME] = config.pop("esphomeyaml")
if CONF_ESPHOME not in config:
result.add_str_error(
"'esphome' section missing from configuration. Please make sure "
"your configuration has an 'esphome:' line in it.",
[],
)
return result
# 2. Load partial core config
result[CONF_ESPHOME] = config[CONF_ESPHOME]
result.add_output_path([CONF_ESPHOME], CONF_ESPHOME)
try:
core_config.preload_core_config(config, result)
except vol.Invalid as err:
result.add_error(err)
return result
# Remove temporary esphome config path again, it will be reloaded later
result.remove_output_path([CONF_ESPHOME], CONF_ESPHOME)
# First run platform validation steps
for key in TARGET_PLATFORMS:
if key in config:
result.add_validation_step(LoadValidationStep(key, config[key]))
result.run_validation_steps()
if result.errors:
# do not try to validate further as we don't know what the target is
return result
for domain, conf in config.items():
result.add_validation_step(LoadValidationStep(domain, conf))
result.add_validation_step(IDPassValidationStep())
result.add_validation_step(PinUseValidationCheck())
result.add_validation_step(RemoveReferenceValidationStep())
result.run_validation_steps()
return result
def humanize_error(config, validation_error):
validation_error = str(validation_error)
m = re.match(
r"^(.*?)\s*(?:for dictionary value )?@ data\[.*$", validation_error, re.DOTALL
)
if m is not None:
validation_error = m.group(1)
validation_error = validation_error.strip()
if not validation_error.endswith("."):
validation_error += "."
return validation_error
def _get_parent_name(path, config):
if not path:
return "<root>"
for domain_path, domain in config.output_paths:
if _path_begins_with(path, domain_path):
if len(path) > len(domain_path):
# Sub-item
break
return domain
# When processing a list, skip back over the index
while len(path) > 1 and isinstance(path[-1], int):
path = path[:-1]
return path[-1]
def _format_vol_invalid(ex: vol.Invalid, config: Config) -> str:
message = ""
paren = _get_parent_name(ex.path[:-1], config)
if isinstance(ex, ExtraKeysInvalid):
if ex.candidates:
message += f"[{ex.path[-1]}] is an invalid option for [{paren}]. Did you mean {', '.join(f'[{x}]' for x in ex.candidates)}?"
else:
message += f"[{ex.path[-1]}] is an invalid option for [{paren}]. Please check the indentation."
elif "extra keys not allowed" in str(ex):
message += f"[{ex.path[-1]}] is an invalid option for [{paren}]."
elif isinstance(ex, vol.RequiredFieldInvalid):
if ex.msg == "required key not provided":
message += f"'{ex.path[-1]}' is a required option for [{paren}]."
else:
# Required has set a custom error message
message += ex.msg
else:
message += humanize_error(config, ex)
return message
class InvalidYAMLError(EsphomeError):
def __init__(self, base_exc):
try:
base = str(base_exc)
except UnicodeDecodeError:
base = repr(base_exc)
message = f"Invalid YAML syntax:\n\n{base}"
super().__init__(message)
self.base_exc = base_exc
def _load_config(command_line_substitutions: dict[str, Any]) -> Config:
"""Load the configuration file."""
try:
config = yaml_util.load_yaml(CORE.config_path)
except EsphomeError as e:
raise InvalidYAMLError(e) from e
try:
return validate_config(config, command_line_substitutions)
except EsphomeError:
raise
except Exception:
_LOGGER.error("Unexpected exception while reading configuration:")
raise
def load_config(command_line_substitutions: dict[str, Any]) -> Config:
try:
return _load_config(command_line_substitutions)
except vol.Invalid as err:
raise EsphomeError(f"Error while parsing config: {err}") from err
def line_info(config, path, highlight=True):
"""Display line config source."""
if not highlight:
return None
obj = config.get_deepest_document_range_for_path(path)
if obj:
mark = obj.start_mark
source = f"[source {mark.document}:{mark.line + 1}]"
return color(Fore.CYAN, source)
return "None"
def _print_on_next_line(obj):
if isinstance(obj, (list, tuple, dict)):
return True
if isinstance(obj, str):
return len(obj) > 80
if isinstance(obj, core.Lambda):
return len(obj.value) > 80
return False
def dump_dict(
config: Config, path: ConfigPath, at_root: bool = True
) -> tuple[str, bool]:
conf = config.get_nested_item(path)
ret = ""
multiline = False
if at_root:
error = config.get_error_for_path(path)
if error is not None:
ret += f"\n{color(Fore.BOLD_RED, _format_vol_invalid(error, config))}\n"
if isinstance(conf, (list, tuple)):
multiline = True
if not conf:
ret += "[]"
multiline = False
for i in range(len(conf)):
path_ = path + [i]
error = config.get_error_for_path(path_)
if error is not None:
ret += f"\n{color(Fore.BOLD_RED, _format_vol_invalid(error, config))}\n"
sep = "- "
if config.is_in_error_path(path_):
sep = color(Fore.RED, sep)
msg, _ = dump_dict(config, path_, at_root=False)
msg = indent(msg)
inf = line_info(config, path_, highlight=config.is_in_error_path(path_))
if inf is not None:
msg = f"{inf}\n{msg}"
elif msg:
msg = msg[2:]
ret += f"{sep + msg}\n"
elif isinstance(conf, dict):
multiline = True
if not conf:
ret += "{}"
multiline = False
for k in conf.keys():
path_ = path + [k]
error = config.get_error_for_path(path_)
if error is not None:
ret += f"\n{color(Fore.BOLD_RED, _format_vol_invalid(error, config))}\n"
st = f"{k}: "
if config.is_in_error_path(path_):
st = color(Fore.RED, st)
msg, m = dump_dict(config, path_, at_root=False)
inf = line_info(config, path_, highlight=config.is_in_error_path(path_))
if m:
msg = f"\n{indent(msg)}"
if inf is not None:
if m:
msg = f" {inf}{msg}"
else:
msg = f"{msg} {inf}"
ret += f"{st + msg}\n"
elif isinstance(conf, str):
if is_secret(conf):
conf = f"!secret {is_secret(conf)}"
if not conf:
conf += "''"
if len(conf) > 80:
conf = f"|-\n{indent(conf)}"
error = config.get_error_for_path(path)
col = Fore.BOLD_RED if error else Fore.KEEP
ret += color(col, str(conf))
elif isinstance(conf, core.Lambda):
if is_secret(conf):
conf = f"!secret {is_secret(conf)}"
conf = f"!lambda |-\n{indent(str(conf.value))}"
error = config.get_error_for_path(path)
col = Fore.BOLD_RED if error else Fore.KEEP
ret += color(col, conf)
elif conf is None:
pass
else:
error = config.get_error_for_path(path)
col = Fore.BOLD_RED if error else Fore.KEEP
ret += color(col, str(conf))
multiline = "\n" in ret
return ret, multiline
def strip_default_ids(config):
if isinstance(config, list):
to_remove = []
for i, x in enumerate(config):
x = config[i] = strip_default_ids(x)
if (isinstance(x, core.ID) and not x.is_manual) or isinstance(
x, core.AutoLoad
):
to_remove.append(x)
for x in to_remove:
config.remove(x)
elif isinstance(config, dict):
to_remove = []
for k, v in config.items():
v = config[k] = strip_default_ids(v)
if (isinstance(v, core.ID) and not v.is_manual) or isinstance(
v, core.AutoLoad
):
to_remove.append(k)
for k in to_remove:
config.pop(k)
return config
def read_config(command_line_substitutions):
_LOGGER.info("Reading configuration %s...", CORE.config_path)
try:
res = load_config(command_line_substitutions)
except EsphomeError as err:
_LOGGER.error("Error while reading config: %s", err)
return None
if res.errors:
if not CORE.verbose:
res = strip_default_ids(res)
safe_print(color(Fore.BOLD_RED, "Failed config"))
safe_print("")
for path, domain in res.output_paths:
if not res.is_in_error_path(path):
continue
errstr = color(Fore.BOLD_RED, f"{domain}:")
errline = line_info(res, path)
if errline:
errstr += f" {errline}"
safe_print(errstr)
split_dump = dump_dict(res, path)[0].splitlines()
# find the last error message
i = len(split_dump) - 1
while i > 10 and "\033[" not in split_dump[i]:
i = i - 1
# discard lines more than 4 beyond the last error
i = min(i + 4, len(split_dump))
safe_print(indent("\n".join(split_dump[:i])))
for err in res.errors:
safe_print(color(Fore.BOLD_RED, err.msg))
safe_print("")
return None
return res