diff --git a/.gitignore b/.gitignore index b68bb3b..fbb1d23 100644 --- a/.gitignore +++ b/.gitignore @@ -152,3 +152,5 @@ cython_debug/ #.idea/ aes.py *.bak +emu/PV_BLE_cover/PV_BLE_cover.ino +img/ diff --git a/custom_components/hunterdouglas_powerview_ble/api.py b/custom_components/hunterdouglas_powerview_ble/api.py index 3567b3c..4c1500e 100644 --- a/custom_components/hunterdouglas_powerview_ble/api.py +++ b/custom_components/hunterdouglas_powerview_ble/api.py @@ -12,6 +12,10 @@ from bleak.exc import BleakError from bleak.uuids import normalize_uuid_str from bleak_retry_connector import establish_connection from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.ciphers.base import ( + AEADDecryptionContext, + AEADEncryptionContext, +) from homeassistant.components.cover import ATTR_CURRENT_POSITION from .const import LOGGER, TIMEOUT @@ -25,6 +29,7 @@ ATTR_ACTIVITY: Final[str] = "activity" SHADE_TYPE: Final[dict[int, str]] = { + # up down only 1: "Designer Roller", 4: "Roman", 5: "Bottom Up", @@ -38,6 +43,11 @@ SHADE_TYPE: Final[dict[int, str]] = { 52: "Banded Shades", 53: "Sonnette", 84: "Vignette", + # top down bottom up + 8: "Duette, Top Down Bottom Up", + 9: "Duette DuoLite, Top Down Bottom Up", + 33: "Duette Architella, Top Down Bottom Up", + 47: "Pleated, Top Down Bottom Up", } OPEN_POSITION: Final[int] = 100 @@ -91,11 +101,11 @@ class PowerViewBLE: ], ) self._data_event = asyncio.Event() - self._data: bytearray + self._data: bytearray = bytearray() self._info: PVDeviceInfo = PVDeviceInfo() self._cmd_lock: Final = asyncio.Lock() - self._cmd_next = None - self._cipher: Final = ( + self._cmd_next: tuple[ShadeCmd, bytes] + self._cipher: Final[Cipher | None] = ( Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16))) if len(home_key) == 16 else None @@ -117,7 +127,7 @@ class PowerViewBLE: # general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len async def _cmd( - self, cmd: tuple[ShadeCmd, bytearray], disconnect: bool = True + self, cmd: tuple[ShadeCmd, bytes], disconnect: bool = True ) -> None: self._cmd_next = cmd if self._cmd_lock.locked(): @@ -127,16 +137,14 @@ class PowerViewBLE: async with self._cmd_lock: try: await self._connect() - cmd_run = self._cmd_next - tx_data = ( - bytearray( - int.to_bytes(cmd_run[0].value, 2, byteorder="little") - + bytes([self._seqcnt, len(cmd_run[1])]) - ) + cmd_run: tuple[ShadeCmd, bytes] = self._cmd_next + tx_data: bytes = bytes( + int.to_bytes(cmd_run[0].value, 2, byteorder="little") + + bytes([self._seqcnt, len(cmd_run[1])]) + cmd_run[1] ) if self._cipher is not None: - enc = self._cipher.encryptor() + enc: AEADEncryptionContext = self._cipher.encryptor() tx_data = enc.update(tx_data) + enc.finalize() self._data_event.clear() LOGGER.debug("sending cmd: %s", tx_data) @@ -161,8 +169,8 @@ class PowerViewBLE: if len(data) != 9: LOGGER.debug("not a V2 record!") return [] - pos = int.from_bytes(data[3:5], byteorder="little") - pos2 = (int(data[5]) << 4) + (int(data[4]) >> 4) + pos: int = int.from_bytes(data[3:5], byteorder="little") + pos2: int = (int(data[5]) << 4) + (int(data[4]) >> 4) return [ (ATTR_CURRENT_POSITION, ((pos >> 2) / 10)), ("position2", pos2 >> 2), @@ -185,7 +193,7 @@ class PowerViewBLE: await self._cmd( ( ShadeCmd.SET_POSITION, - bytearray( + bytes( int.to_bytes(value * 100, 2, byteorder="little") + bytes([0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x0]) ), @@ -201,7 +209,7 @@ class PowerViewBLE: async def stop(self) -> None: """Stop device movement.""" LOGGER.debug("%s stop", self.name) - await self._cmd((ShadeCmd.STOP, bytearray(b""))) + await self._cmd((ShadeCmd.STOP, b"")) async def close(self) -> None: """Fully close cover.""" @@ -217,7 +225,7 @@ class PowerViewBLE: await self._cmd( ( ShadeCmd.ACTIVATE_SCENE, - bytearray(int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2])), + int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2]), ), ) @@ -225,7 +233,7 @@ class PowerViewBLE: """Verify shade response data.""" data: bytearray = din if self._cipher is not None: - dec = self._cipher.decryptor() + dec: AEADDecryptionContext = self._cipher.decryptor() data = bytearray(dec.update(din) + dec.finalize()) if len(data) < 4: LOGGER.error("Reponse message too short")