added AES support, hardened connection setup

This commit is contained in:
patman15
2024-08-27 21:55:26 +02:00
parent 9a03567f70
commit 8e929f191e
6 changed files with 156 additions and 79 deletions

View File

@@ -4,20 +4,28 @@ import asyncio
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
import time import time
from typing import Final
from bleak import BleakClient from bleak import BleakClient
from bleak.backends.device import BLEDevice from bleak.backends.device import BLEDevice
from bleak.exc import BleakError from bleak.exc import BleakError
from bleak.uuids import normalize_uuid_str from bleak.uuids import normalize_uuid_str
from bleak_retry_connector import establish_connection
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from homeassistant.components.cover import ATTR_CURRENT_POSITION from homeassistant.components.cover import ATTR_CURRENT_POSITION
from .const import LOGGER, TIMEOUT, UUID from .const import LOGGER, TIMEOUT
ATTR_ACTIVITY = "activity" UUID_COV_SERVICE: Final = normalize_uuid_str("fdc1")
UUID_TX: Final = "cafe1001-c0ff-ee01-8000-a110ca7ab1e0"
UUID_DEV_SERVICE: Final = normalize_uuid_str("180a")
UUID_BAT_SERVICE: Final = normalize_uuid_str("180f")
ATTR_ACTIVITY: Final = "activity"
SHADE_TYPE: dict[int, str] = { SHADE_TYPE: Final[dict[int, str]] = {
1: "Designer Roller", 1: "Designer Roller",
4: "Roman", 4: "Roman",
5: "Bottom Up", 5: "Bottom Up",
@@ -33,10 +41,10 @@ SHADE_TYPE: dict[int, str] = {
84: "Vignette", 84: "Vignette",
} }
OPEN_POSITION = 100 OPEN_POSITION: Final = 100
CLOSED_POSITION = 0 CLOSED_POSITION: Final = 0
POWER_LEVELS: dict[int, int] = { POWER_LEVELS: Final[dict[int, int]] = {
4: 100, # 4 is hardwired 4: 100, # 4 is hardwired
3: 100, # 3 = 100% to 51% power remaining 3: 100, # 3 = 100% to 51% power remaining
2: 50, # 2 = 50% to 21% power remaining 2: 50, # 2 = 50% to 21% power remaining
@@ -69,20 +77,24 @@ class DeviceInfo:
class PowerViewBLE: class PowerViewBLE:
"""Class to handle connection to PowerView remote device.""" """Class to handle connection to PowerView remote device."""
UUID_COV_SERVICE = UUID def __init__(self, ble_device: BLEDevice, home_key: bytes = b"") -> None:
UUID_TX = "cafe1001-c0ff-ee01-8000-a110ca7ab1e0"
UUID_DEV_SERVICE = normalize_uuid_str("180a")
UUID_BAT_SERVICE = normalize_uuid_str("180f")
def __init__(self, ble_device: BLEDevice) -> None:
"""Initialize device API via Bluetooth.""" """Initialize device API via Bluetooth."""
self._ble_device: BLEDevice = ble_device self._ble_device: Final[BLEDevice] = ble_device
self.name = self._ble_device.name self.name: Final[str] = self._ble_device.name or "unknown"
self.seqcnt: int = 1 self._seqcnt: int = 1
self._client: BleakClient | None = None self._client: BleakClient | None = None
self._data_event = asyncio.Event() self._data_event = asyncio.Event()
self._data: bytearray self._data: bytearray
self._info: DeviceInfo = DeviceInfo() self._info: DeviceInfo = DeviceInfo()
# self._connect_lock = (
# asyncio.Lock()
# ) # TODO: try get rid of (device_info vs. normal cmds)
self._cmd_lock = asyncio.Lock()
self._cipher: Final = (
Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16)))
if len(home_key) == 16
else None
)
async def _wait_event(self) -> None: async def _wait_event(self) -> None:
await self._data_event.wait() await self._data_event.wait()
@@ -100,31 +112,41 @@ class PowerViewBLE:
# general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len # general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len
async def _cmd(self, cmd: ShadeCmd, data: bytearray) -> None: async def _cmd(self, cmd: ShadeCmd, data: bytearray) -> None:
try: # if self._cmd_lock.locked():
await self._connect()
assert self._client is not None, "missing BT client" # return
tx_data = (
bytearray( async with self._cmd_lock:
int.to_bytes(cmd.value, 2, byteorder="little")
+ bytes([self.seqcnt, len(data)])
)
+ data
)
self._data_event.clear()
LOGGER.debug("sending cmd: %s", tx_data)
await self._client.write_gatt_char(self.UUID_TX, tx_data, False)
self.seqcnt += 1
LOGGER.debug("waiting for response")
try: try:
await asyncio.wait_for(self._wait_event(), timeout=TIMEOUT) await self._connect()
self._verify_response(self._data, self.seqcnt - 1, cmd) assert self._client is not None, "missing BT client"
except TimeoutError as ex: tx_data = (
raise TimeoutError("device operation timed out") from ex bytearray(
finally: int.to_bytes(cmd.value, 2, byteorder="little")
await self._client.disconnect() + bytes([self._seqcnt, len(data)])
except Exception as ex: )
LOGGER.debug("Error: %s - %s", type(ex).__name__, ex) + data
raise )
if self._cipher is not None:
enc = self._cipher.encryptor()
tx_data = enc.update(tx_data) + enc.finalize()
self._data_event.clear()
LOGGER.debug("sending cmd: %s", tx_data)
await self._client.write_gatt_char(UUID_TX, tx_data, False)
self._seqcnt += 1
LOGGER.debug("waiting for response")
try:
await asyncio.wait_for(
self._wait_event(), timeout=TIMEOUT
) # TODO: how long to wait?!
self._verify_response(self._data, self._seqcnt - 1, cmd)
except TimeoutError as ex:
raise TimeoutError("Device did not send confirmation.") from ex
# finally:
# await self._client.disconnect() # device disconnects itself
except Exception as ex:
LOGGER.debug("Error: %s - %s", type(ex).__name__, ex)
raise
@staticmethod @staticmethod
def dec_manufacturer_data(data: bytearray) -> list[tuple[str, float]]: def dec_manufacturer_data(data: bytearray) -> list[tuple[str, float]]:
@@ -163,8 +185,8 @@ class PowerViewBLE:
await self._cmd(ShadeCmd.STOP, bytearray(b"")) await self._cmd(ShadeCmd.STOP, bytearray(b""))
# uint8_t scene#, uint8_t unknown # uint8_t scene#, uint8_t unknown
# open: scene 2 # open: scene 2 (programmed by app?)
# close: scene 3 # close: scene 3 (programmed by app?)
async def activate_scene(self, idx: int) -> None: async def activate_scene(self, idx: int) -> None:
"""Activate stored scene.""" """Activate stored scene."""
LOGGER.debug("%s set scene #%i", self.name, idx) LOGGER.debug("%s set scene #%i", self.name, idx)
@@ -173,8 +195,12 @@ class PowerViewBLE:
bytearray(int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2])), bytearray(int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2])),
) )
def _verify_response(self, data: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool: def _verify_response(self, input: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
"""Verify shade response data.""" """Verify shade response data."""
data = input
if self._cipher is not None:
dec = self._cipher.decryptor()
data = dec.update(input) + dec.finalize()
if len(data) < 4: if len(data) < 4:
LOGGER.warning("Message too short") LOGGER.warning("Message too short")
return False return False
@@ -195,7 +221,7 @@ class PowerViewBLE:
async def query_dev_info(self) -> dict[str, str]: async def query_dev_info(self) -> dict[str, str]:
"""Return detailed device information.""" """Return detailed device information."""
data: dict[str, str] = {} data: dict[str, str] = {}
uuids: dict[str, str] = { uuids: Final[dict[str, str]] = {
"manufacturer": "2a29", "manufacturer": "2a29",
"model": "2a24", "model": "2a24",
"serial_nr": "2a25", "serial_nr": "2a25",
@@ -236,26 +262,40 @@ class PowerViewBLE:
async def _connect(self) -> None: async def _connect(self) -> None:
"""Connect to the device and setup notification if not connected.""" """Connect to the device and setup notification if not connected."""
if not self.is_connected: LOGGER.debug("Connecting %s", self.name)
LOGGER.debug("Connecting %s", self.name)
start = time.time()
if not isinstance(self._client, BleakClient):
self._client = BleakClient(
self._ble_device,
disconnected_callback=self._on_disconnect,
services=[
self.UUID_COV_SERVICE,
# self.UUID_DEV_SERVICE,
# self.UUID_BAT_SERVICE,
],
)
await self._client.connect() # dangerous_use_bleak_cache = True
LOGGER.debug("\tconnect took %i", time.time() - start)
await self._client.start_notify(self.UUID_TX, self._notification_handler)
# await self._query_dev_info()
else: if self.is_connected:
LOGGER.debug("%s already connected", self.name) LOGGER.debug("%s already connected", self.name)
return
start = time.time()
# if not isinstance(self._client, BleakClient):
# self._client = BleakClient(
# self._ble_device,
# disconnected_callback=self._on_disconnect,
# services=[
# UUID_COV_SERVICE,
# # self.UUID_DEV_SERVICE,
# # self.UUID_BAT_SERVICE,
# ],
# )
self._client = await establish_connection(
BleakClient,
self._ble_device,
self.name,
disconnected_callback=self._on_disconnect,
services=[
UUID_COV_SERVICE,
# self.UUID_DEV_SERVICE,
# self.UUID_BAT_SERVICE,
],
)
await self._client.start_notify(UUID_TX, self._notification_handler)
# self._client.connect() # dangerous_use_bleak_cache = True
LOGGER.debug("\tconnect took %i", time.time() - start)
# await self._query_dev_info()
async def _disconnect(self) -> None: async def _disconnect(self) -> None:
"""Disconnect the device and stop notifications.""" """Disconnect the device and stop notifications."""

View File

@@ -16,7 +16,8 @@ from homeassistant.const import CONF_ADDRESS
# from homeassistant.helpers.device_registry import format_mac # from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
from .const import DOMAIN, LOGGER, MFCT_ID, UUID from .api import UUID_COV_SERVICE as UUID
from .const import DOMAIN, LOGGER, MFCT_ID
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):

View File

@@ -1,8 +1,9 @@
"""Constants for the BLE Battery Management System integration.""" """Constants for the BLE Battery Management System integration."""
import logging import logging
from typing import Final
from bleak.uuids import normalize_uuid_str # from bleak.uuids import normalize_uuid_str
# from homeassistant.const import ( # noqa: F401 # from homeassistant.const import ( # noqa: F401
# ATTR_BATTERY_CHARGING, # ATTR_BATTERY_CHARGING,
@@ -12,11 +13,12 @@ from bleak.uuids import normalize_uuid_str
# ) # )
DOMAIN = "hunterdouglas_powerview_ble" DOMAIN: Final = "hunterdouglas_powerview_ble"
LOGGER = logging.getLogger(__package__) LOGGER: Final = logging.getLogger(__package__)
UUID = normalize_uuid_str("fdc1") MFCT_ID: Final = 2073
MFCT_ID = 2073 TIMEOUT: Final = 15
TIMEOUT = 15 HOME_KEY: Final = b""
# attributes (do not change) # attributes (do not change)
ATTR_RSSI = "rssi" ATTR_RSSI = "rssi"

View File

@@ -1,5 +1,6 @@
"""Home Assistant coordinator for Hunter Douglas PowerView (BLE) integration.""" """Home Assistant coordinator for Hunter Douglas PowerView (BLE) integration."""
import asyncio
from typing import Any from typing import Any
from bleak.backends.device import BLEDevice from bleak.backends.device import BLEDevice
@@ -13,7 +14,7 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import CONNECTION_BLUETOOTH, DeviceInfo from homeassistant.helpers.device_registry import CONNECTION_BLUETOOTH, DeviceInfo
from .api import SHADE_TYPE, PowerViewBLE from .api import SHADE_TYPE, PowerViewBLE
from .const import ATTR_RSSI, DOMAIN, LOGGER from .const import ATTR_RSSI, DOMAIN, HOME_KEY, LOGGER
class PVCoordinator(PassiveBluetoothDataUpdateCoordinator): class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
@@ -25,10 +26,11 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
"""Initialize BMS data coordinator.""" """Initialize BMS data coordinator."""
assert ble_device.name is not None assert ble_device.name is not None
self._mac = ble_device.address self._mac = ble_device.address
self.api = PowerViewBLE(ble_device) self.api = PowerViewBLE(ble_device, HOME_KEY)
self.data: dict[str, int | float | bool] = {} self.data: dict[str, int | float | bool] = {}
self._manuf_dat = data.get("manufacturer_data") self._manuf_dat = data.get("manufacturer_data")
self.dev_details: dict[str, str] = {} self.dev_details: dict[str, str] = {}
self._dev_info_task: asyncio.Task | None = None
LOGGER.debug( LOGGER.debug(
"Initializing coordinator for %s (%s)", "Initializing coordinator for %s (%s)",
@@ -45,11 +47,14 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
async def _get_device_info(self) -> None: async def _get_device_info(self) -> None:
self.dev_details = await self.api.query_dev_info() self.dev_details = await self.api.query_dev_info()
@property @property
def device_info(self) -> DeviceInfo: def device_info(self) -> DeviceInfo:
"""Return detailed device information for GUI.""" """Return detailed device information for GUI."""
LOGGER.debug("device_info, %s", self.dev_details) LOGGER.debug("device_info, %s", self.dev_details)
if self._dev_info_task is None or not self._dev_info_task.done:
self._dev_info_task = self.hass.async_create_task(
self._get_device_info(), "query_device_details", False
)
return DeviceInfo( return DeviceInfo(
identifiers={ identifiers={
(DOMAIN, self.name), (DOMAIN, self.name),
@@ -73,6 +78,11 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
hw_version=self.dev_details.get("hw_rev"), hw_version=self.dev_details.get("hw_rev"),
) )
@property
def device_present(self) -> bool:
"""Check if a device is present."""
return bluetooth.async_address_present(self.hass, self._mac, connectable=True)
@callback @callback
def _async_handle_bluetooth_event( def _async_handle_bluetooth_event(
self, self,

View File

@@ -14,11 +14,12 @@ from homeassistant.components.cover import (
) )
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.device_registry import DeviceInfo, format_mac from homeassistant.helpers.device_registry import DeviceInfo, format_mac
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .api import CLOSED_POSITION, OPEN_POSITION from .api import CLOSED_POSITION, OPEN_POSITION
from .const import DOMAIN, LOGGER from .const import DOMAIN, HOME_KEY, LOGGER
from .coordinator import PVCoordinator from .coordinator import PVCoordinator
@@ -82,7 +83,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
@property @property
def supported_features(self) -> CoverEntityFeature: # type: ignore[reportIncompatibleVariableOverride] def supported_features(self) -> CoverEntityFeature: # type: ignore[reportIncompatibleVariableOverride]
"""Flag supported features, disable control if encryption is needed.""" """Flag supported features, disable control if encryption is needed."""
if self._coord.data.get("home_id") or self._coord.data.get("battery_charging"): if (self._coord.data.get("home_id") and len(HOME_KEY) != 16) or self._coord.data.get("battery_charging"):
return CoverEntityFeature(0) return CoverEntityFeature(0)
return super().supported_features return super().supported_features
@@ -103,19 +104,42 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
self._target_position = kwargs.get(ATTR_POSITION, None) self._target_position = kwargs.get(ATTR_POSITION, None)
if self._target_position is not None: if self._target_position is not None:
LOGGER.debug("set cover to position %i", self._target_position) LOGGER.debug("set cover to position %i", self._target_position)
await self._coord.api.set_position(self._target_position) if not self._coord.device_present:
LOGGER.debug("device not present")
return
try:
await self._coord.api.set_position(self._target_position)
except Exception as err:
raise HomeAssistantError(
f"Failed to move cover '{self.name}' to {self._target_position}%: {err}"
) from err
async def async_open_cover(self, **kwargs: Any) -> None: async def async_open_cover(self, **kwargs: Any) -> None:
"""Open the cover.""" """Open the cover."""
LOGGER.debug("open cover") LOGGER.debug("open cover")
await self._coord.api.set_position(OPEN_POSITION) try:
await self._coord.api.set_position(OPEN_POSITION)
except Exception as err:
raise HomeAssistantError(
f"Failed open cover '{self.name}': {err}"
) from err
async def async_close_cover(self, **kwargs: Any) -> None: async def async_close_cover(self, **kwargs: Any) -> None:
"""Close the cover tilt.""" """Close the cover tilt."""
LOGGER.debug("close cover") LOGGER.debug("close cover")
await self._coord.api.set_position(CLOSED_POSITION) try:
await self._coord.api.set_position(CLOSED_POSITION)
except Exception as err:
raise HomeAssistantError(
f"Failed close cover '{self.name}': {err}"
) from err
async def async_stop_cover(self, **kwargs: Any) -> None: async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop the cover.""" """Stop the cover."""
LOGGER.debug("stop cover") LOGGER.debug("stop cover")
await self._coord.api.stop() try:
await self._coord.api.stop()
except Exception as err:
raise HomeAssistantError(
f"Failed stop cover '{self.name}': {err}"
) from err

View File

@@ -16,6 +16,6 @@
"iot_class": "local_polling", "iot_class": "local_polling",
"issue_tracker": "https://github.com/patman15/hdpv_ble/issues", "issue_tracker": "https://github.com/patman15/hdpv_ble/issues",
"loggers": ["hunterdouglas_powerview_ble"], "loggers": ["hunterdouglas_powerview_ble"],
"requirements": [], "requirements": ["cryptography>=43.0.0"],
"version": 0.11 "version": 0.20
} }