test the repair flow

This commit is contained in:
farmio 2024-04-17 23:45:10 +02:00
parent 50f2910131
commit cd1e4b5749
1 changed files with 38 additions and 4 deletions

View File

@ -1,24 +1,31 @@
"""Test repairs for KNX integration."""
from homeassistant.components.knx.const import KNX_ADDRESS
from http import HTTPStatus
from homeassistant.components.knx.const import DOMAIN, KNX_ADDRESS
from homeassistant.components.knx.schema import NotifySchema
from homeassistant.components.notify import DOMAIN as NOTIFY_DOMAIN
from homeassistant.components.repairs import DOMAIN as REPAIRS_DOMAIN
from homeassistant.components.repairs.issue_handler import (
async_process_repairs_platforms,
)
from homeassistant.components.repairs.websocket_api import (
RepairsFlowIndexView,
RepairsFlowResourceView,
)
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from .conftest import KNXTestKit
from tests.typing import WebSocketGenerator
from tests.typing import ClientSessionGenerator, WebSocketGenerator
async def test_knx_notify_service_issue(
hass: HomeAssistant,
knx: KNXTestKit,
hass_client: ClientSessionGenerator,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test the legacy notify service still works before migration and repair flow is triggered."""
@ -35,9 +42,10 @@ async def test_knx_notify_service_issue(
await async_process_repairs_platforms(hass)
ws_client = await hass_ws_client(hass)
http_client = await hass_client()
# Assert no issue is present
await ws_client.send_json({"id": 1, "type": "repairs/list_issues"})
await ws_client.send_json_auto_id({"type": "repairs/list_issues"})
msg = await ws_client.receive_json()
assert msg["success"]
assert len(msg["result"]["issues"]) == 0
@ -56,7 +64,33 @@ async def test_knx_notify_service_issue(
)
# Assert the issue is present
await ws_client.send_json({"id": 2, "type": "repairs/list_issues"})
await ws_client.send_json_auto_id({"type": "repairs/list_issues"})
msg = await ws_client.receive_json()
assert msg["success"]
assert len(msg["result"]["issues"]) == 1
issue = msg["result"]["issues"][0]
assert issue["issue_id"] == "migrate_notify"
url = RepairsFlowIndexView.url
resp = await http_client.post(
url, json={"handler": DOMAIN, "issue_id": "migrate_notify"}
)
assert resp.status == HTTPStatus.OK
data = await resp.json()
flow_id = data["flow_id"]
assert data["step_id"] == "confirm"
url = RepairsFlowResourceView.url.format(flow_id=flow_id)
resp = await http_client.post(url)
assert resp.status == HTTPStatus.OK
data = await resp.json()
assert data["type"] == "create_entry"
# Test confirm step in repair flow
await hass.async_block_till_done()
# Assert the issue is no longer present
await ws_client.send_json_auto_id({"type": "repairs/list_issues"})
msg = await ws_client.receive_json()
assert msg["success"]
assert len(msg["result"]["issues"]) == 0