mirror of
https://github.com/esphome/esphome.git
synced 2026-06-24 01:48:54 +02:00
[core] Use CORE.is_* platform helpers in __main__ (#17144)
This commit is contained in:
@@ -52,11 +52,6 @@ from esphome.const import (
|
||||
CONF_WEB_SERVER,
|
||||
CONF_WIFI,
|
||||
ENV_NOGITIGNORE,
|
||||
KEY_CORE,
|
||||
KEY_TARGET_PLATFORM,
|
||||
PLATFORM_ESP32,
|
||||
PLATFORM_ESP8266,
|
||||
PLATFORM_RP2040,
|
||||
SECRETS_FILES,
|
||||
Toolchain,
|
||||
)
|
||||
@@ -359,7 +354,7 @@ def choose_upload_log_host(
|
||||
bootsel_permission_error = False
|
||||
if (
|
||||
purpose == Purpose.UPLOADING
|
||||
and CORE.data.get(KEY_CORE, {}).get(KEY_TARGET_PLATFORM) == PLATFORM_RP2040
|
||||
and CORE.is_rp2040
|
||||
and (picotool := _find_picotool()) is not None
|
||||
):
|
||||
bootsel = detect_rp2040_bootsel(picotool)
|
||||
@@ -406,7 +401,7 @@ def choose_upload_log_host(
|
||||
# Show helpful BOOTSEL instructions for RP2040 when no BOOTSEL device is found
|
||||
if (
|
||||
purpose == Purpose.UPLOADING
|
||||
and CORE.data.get(KEY_CORE, {}).get(KEY_TARGET_PLATFORM) == PLATFORM_RP2040
|
||||
and CORE.is_rp2040
|
||||
and not any(get_port_type(opt[1]) == PortType.BOOTSEL for opt in options)
|
||||
):
|
||||
if bootsel_permission_error:
|
||||
@@ -984,7 +979,7 @@ def upload_using_platformio(config: ConfigType, port: str) -> int:
|
||||
# RP2040 platform-raspberrypi build recipe expects firmware.bin.signed for
|
||||
# the upload target, but 'nobuild' skips the build phase that creates it.
|
||||
# Create it here so the upload doesn't fail.
|
||||
if CORE.data.get(KEY_CORE, {}).get(KEY_TARGET_PLATFORM) == PLATFORM_RP2040:
|
||||
if CORE.is_rp2040:
|
||||
idedata = toolchain.get_idedata(config)
|
||||
build_dir = Path(idedata.firmware_elf_path).parent
|
||||
firmware_bin = build_dir / "firmware.bin"
|
||||
@@ -1169,10 +1164,10 @@ def upload_program(
|
||||
check_permissions(host)
|
||||
|
||||
exit_code = 1
|
||||
if CORE.target_platform in (PLATFORM_ESP32, PLATFORM_ESP8266):
|
||||
if CORE.is_esp32 or CORE.is_esp8266:
|
||||
file = getattr(args, "file", None)
|
||||
exit_code = upload_using_esptool(config, host, file, args.upload_speed)
|
||||
elif CORE.target_platform == PLATFORM_RP2040 or CORE.is_libretiny:
|
||||
elif CORE.is_rp2040 or CORE.is_libretiny:
|
||||
exit_code = upload_using_platformio(config, host)
|
||||
# else: Unknown target platform, exit_code remains 1
|
||||
|
||||
@@ -1629,10 +1624,7 @@ def command_run(args: ArgsProtocol, config: ConfigType) -> int | None:
|
||||
|
||||
# After BOOTSEL upload, wait for a new serial port to appear
|
||||
# so it shows up in the log chooser
|
||||
if (
|
||||
successful_device is None
|
||||
and CORE.data.get(KEY_CORE, {}).get(KEY_TARGET_PLATFORM) == PLATFORM_RP2040
|
||||
):
|
||||
if successful_device is None and CORE.is_rp2040:
|
||||
_wait_for_serial_port(known_ports=pre_upload_ports)
|
||||
# If exactly one new serial port appeared, use it directly
|
||||
serial_ports = get_serial_ports()
|
||||
|
||||
@@ -159,9 +159,12 @@ def setup_core(
|
||||
CORE.config = config
|
||||
CORE.toolchain = Toolchain.PLATFORMIO
|
||||
|
||||
if platform is not None:
|
||||
CORE.data[KEY_CORE] = {}
|
||||
CORE.data[KEY_CORE][KEY_TARGET_PLATFORM] = platform
|
||||
# Production always populates CORE.data[KEY_CORE] before upload/logs run
|
||||
# (the platform validator sets it during read_config, and
|
||||
# StorageJSON.apply_to_core sets it on the cache fast path), so mirror
|
||||
# that here. Tests that exercise platform-specific behavior pass a
|
||||
# platform explicitly; the rest get a platform-agnostic None.
|
||||
CORE.data[KEY_CORE] = {KEY_TARGET_PLATFORM: platform}
|
||||
|
||||
if tmp_path is not None:
|
||||
CORE.config_path = str(tmp_path / f"{name}.yaml")
|
||||
@@ -1660,6 +1663,29 @@ def test_upload_program_serial_platformio_platforms(
|
||||
mock_upload_using_platformio.assert_called_once_with(config, device)
|
||||
|
||||
|
||||
@patch("esphome.__main__.importlib.import_module")
|
||||
def test_upload_program_serial_unknown_platform(
|
||||
mock_import: Mock,
|
||||
mock_get_port_type: Mock,
|
||||
mock_check_permissions: Mock,
|
||||
) -> None:
|
||||
"""Serial upload on an unsupported platform falls through to exit_code 1."""
|
||||
setup_core(platform="custom_platform")
|
||||
# Module has no upload_program handler, so the SERIAL branch is reached.
|
||||
mock_import.return_value = MagicMock(spec=[])
|
||||
mock_get_port_type.return_value = "SERIAL"
|
||||
|
||||
config = {}
|
||||
args = MockArgs()
|
||||
devices = ["/dev/ttyUSB0"]
|
||||
|
||||
exit_code, host = upload_program(config, args, devices)
|
||||
|
||||
assert exit_code == 1
|
||||
assert host is None
|
||||
mock_check_permissions.assert_called_once_with("/dev/ttyUSB0")
|
||||
|
||||
|
||||
def test_upload_using_platformio_creates_signed_bin_for_rp2040(
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
@@ -6350,6 +6376,44 @@ def test_command_run_defaults_subscribe_states_true(
|
||||
)
|
||||
|
||||
|
||||
def test_command_run_rp2040_bootsel_redetects_serial_port() -> None:
|
||||
"""After a BOOTSEL upload (no device) on RP2040, command_run waits for and
|
||||
picks up the newly enumerated serial port before showing logs."""
|
||||
setup_core(
|
||||
config={"logger": {}, CONF_API: {}, CONF_MDNS: {CONF_DISABLED: False}},
|
||||
platform=PLATFORM_RP2040,
|
||||
)
|
||||
|
||||
args = MockArgs()
|
||||
args.no_logs = False
|
||||
args.device = None
|
||||
|
||||
new_port = MockSerialPort("/dev/ttyACM0", "RP2040 Serial")
|
||||
|
||||
with (
|
||||
patch("esphome.__main__.write_cpp", return_value=0),
|
||||
patch("esphome.__main__.compile_program", return_value=0),
|
||||
patch(
|
||||
"esphome.__main__.choose_upload_log_host",
|
||||
side_effect=[[], ["/dev/ttyACM0"]],
|
||||
) as mock_choose,
|
||||
patch("esphome.__main__.upload_program", return_value=(0, None)),
|
||||
patch(
|
||||
"esphome.__main__.get_serial_ports",
|
||||
side_effect=[[], [new_port]],
|
||||
),
|
||||
patch("esphome.__main__._wait_for_serial_port") as mock_wait,
|
||||
patch("esphome.__main__.show_logs", return_value=0) as mock_show_logs,
|
||||
):
|
||||
result = command_run(args, CORE.config)
|
||||
|
||||
assert result == 0
|
||||
mock_wait.assert_called_once_with(known_ports=set())
|
||||
# The re-detected serial port is used as the preferred logging device.
|
||||
assert mock_choose.call_args_list[-1].kwargs["default"] == "/dev/ttyACM0"
|
||||
mock_show_logs.assert_called_once_with(CORE.config, args, ["/dev/ttyACM0"])
|
||||
|
||||
|
||||
def test_command_idedata_esp_idf_prints_json(capsys: CaptureFixture) -> None:
|
||||
"""Under the native ESP-IDF toolchain, idedata is emitted as JSON."""
|
||||
setup_core()
|
||||
|
||||
Reference in New Issue
Block a user