esphome/esphome/components/canbus/__init__.py

142 lines
4.7 KiB
Python

import esphome.codegen as cg
import esphome.config_validation as cv
from esphome import automation
from esphome.core import CORE, coroutine
from esphome.const import CONF_ID, CONF_TRIGGER_ID, CONF_DATA
CODEOWNERS = ["@mvturnho", "@danielschramm"]
IS_PLATFORM_COMPONENT = True
CONF_CAN_ID = "can_id"
CONF_USE_EXTENDED_ID = "use_extended_id"
CONF_CANBUS_ID = "canbus_id"
CONF_BIT_RATE = "bit_rate"
CONF_ON_FRAME = "on_frame"
CONF_CANBUS_SEND = "canbus.send"
def validate_id(id_value, id_ext):
if not id_ext:
if id_value > 0x7FF:
raise cv.Invalid("Standard IDs must be 11 Bit (0x000-0x7ff / 0-2047)")
def validate_raw_data(value):
if isinstance(value, str):
return value.encode("utf-8")
if isinstance(value, list):
return cv.Schema([cv.hex_uint8_t])(value)
raise cv.Invalid(
"data must either be a string wrapped in quotes or a list of bytes"
)
canbus_ns = cg.esphome_ns.namespace("canbus")
CanbusComponent = canbus_ns.class_("CanbusComponent", cg.Component)
CanbusTrigger = canbus_ns.class_(
"CanbusTrigger",
automation.Trigger.template(cg.std_vector.template(cg.uint8)),
cg.Component,
)
CanSpeed = canbus_ns.enum("CAN_SPEED")
CAN_SPEEDS = {
"5KBPS": CanSpeed.CAN_5KBPS,
"10KBPS": CanSpeed.CAN_10KBPS,
"20KBPS": CanSpeed.CAN_20KBPS,
"31K25BPS": CanSpeed.CAN_31K25BPS,
"33KBPS": CanSpeed.CAN_33KBPS,
"40KBPS": CanSpeed.CAN_40KBPS,
"50KBPS": CanSpeed.CAN_50KBPS,
"80KBPS": CanSpeed.CAN_80KBPS,
"83K3BPS": CanSpeed.CAN_83K3BPS,
"95KBPS": CanSpeed.CAN_95KBPS,
"100KBPS": CanSpeed.CAN_100KBPS,
"125KBPS": CanSpeed.CAN_125KBPS,
"200KBPS": CanSpeed.CAN_200KBPS,
"250KBPS": CanSpeed.CAN_250KBPS,
"500KBPS": CanSpeed.CAN_500KBPS,
"1000KBPS": CanSpeed.CAN_1000KBPS,
}
CONFIG_SCHEMA = cv.Schema(
{
cv.GenerateID(): cv.declare_id(CanbusComponent),
cv.Required(CONF_CAN_ID): cv.int_range(min=0, max=0x1FFFFFFF),
cv.Optional(CONF_BIT_RATE, default="125KBPS"): cv.enum(CAN_SPEEDS, upper=True),
cv.Optional(CONF_USE_EXTENDED_ID, default=False): cv.boolean,
cv.Optional(CONF_ON_FRAME): automation.validate_automation(
{
cv.GenerateID(CONF_TRIGGER_ID): cv.declare_id(CanbusTrigger),
cv.GenerateID(CONF_CAN_ID): cv.int_range(min=0, max=0x1FFFFFFF),
cv.Optional(CONF_USE_EXTENDED_ID, default=False): cv.boolean,
}
),
}
).extend(cv.COMPONENT_SCHEMA)
@coroutine
def setup_canbus_core_(var, config):
validate_id(config[CONF_CAN_ID], config[CONF_USE_EXTENDED_ID])
yield cg.register_component(var, config)
cg.add(var.set_can_id([config[CONF_CAN_ID]]))
cg.add(var.set_use_extended_id([config[CONF_USE_EXTENDED_ID]]))
cg.add(var.set_bitrate(CAN_SPEEDS[config[CONF_BIT_RATE]]))
for conf in config.get(CONF_ON_FRAME, []):
can_id = conf[CONF_CAN_ID]
ext_id = conf[CONF_USE_EXTENDED_ID]
validate_id(can_id, ext_id)
trigger = cg.new_Pvariable(conf[CONF_TRIGGER_ID], var, can_id, ext_id)
yield cg.register_component(trigger, conf)
yield automation.build_automation(
trigger, [(cg.std_vector.template(cg.uint8), "x")], conf
)
@coroutine
def register_canbus(var, config):
if not CORE.has_id(config[CONF_ID]):
var = cg.new_Pvariable(config[CONF_ID], var)
yield setup_canbus_core_(var, config)
# Actions
@automation.register_action(
CONF_CANBUS_SEND,
canbus_ns.class_("CanbusSendAction", automation.Action),
cv.maybe_simple_value(
{
cv.GenerateID(CONF_CANBUS_ID): cv.use_id(CanbusComponent),
cv.Optional(CONF_CAN_ID): cv.int_range(min=0, max=0x1FFFFFFF),
cv.Optional(CONF_USE_EXTENDED_ID, default=False): cv.boolean,
cv.Required(CONF_DATA): cv.templatable(validate_raw_data),
},
key=CONF_DATA,
),
)
def canbus_action_to_code(config, action_id, template_arg, args):
validate_id(config[CONF_CAN_ID], config[CONF_USE_EXTENDED_ID])
var = cg.new_Pvariable(action_id, template_arg)
yield cg.register_parented(var, config[CONF_CANBUS_ID])
if CONF_CAN_ID in config:
can_id = yield cg.templatable(config[CONF_CAN_ID], args, cg.uint32)
cg.add(var.set_can_id(can_id))
use_extended_id = yield cg.templatable(
config[CONF_USE_EXTENDED_ID], args, cg.uint32
)
cg.add(var.set_use_extended_id(use_extended_id))
data = config[CONF_DATA]
if isinstance(data, bytes):
data = [int(x) for x in data]
if cg.is_template(data):
templ = yield cg.templatable(data, args, cg.std_vector.template(cg.uint8))
cg.add(var.set_data_template(templ))
else:
cg.add(var.set_data_static(data))
yield var