Clean-up code and upgrade dependencies (#20)
* Update pyproject.toml * stronger typing * fix type annotations * update dependencies * fix spelling * add missing info to pyproject.toml * code cleanup * add project URLs * fix mypy issues * update HA to 2025.11 * upgrade Python to 3.13.2 to match HA * Update lint.yml
This commit is contained in:
@@ -1,9 +1,9 @@
|
||||
"""Hunter Douglas PowerView BLE API."""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
import time
|
||||
from typing import Final
|
||||
|
||||
from bleak import BleakClient
|
||||
@@ -12,7 +12,11 @@ from bleak.exc import BleakError
|
||||
from bleak.uuids import normalize_uuid_str
|
||||
from bleak_retry_connector import establish_connection
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.primitives.ciphers.base import CipherContext
|
||||
from cryptography.hazmat.primitives.ciphers.base import (
|
||||
AEADDecryptionContext,
|
||||
AEADEncryptionContext,
|
||||
)
|
||||
|
||||
from homeassistant.components.cover import ATTR_CURRENT_POSITION
|
||||
|
||||
from .const import LOGGER, TIMEOUT
|
||||
@@ -26,6 +30,7 @@ ATTR_ACTIVITY: Final[str] = "activity"
|
||||
|
||||
|
||||
SHADE_TYPE: Final[dict[int, str]] = {
|
||||
# up down only
|
||||
1: "Designer Roller",
|
||||
4: "Roman",
|
||||
5: "Bottom Up",
|
||||
@@ -39,6 +44,11 @@ SHADE_TYPE: Final[dict[int, str]] = {
|
||||
52: "Banded Shades",
|
||||
53: "Sonnette",
|
||||
84: "Vignette",
|
||||
# top down bottom up
|
||||
8: "Duette, Top Down Bottom Up",
|
||||
9: "Duette DuoLite, Top Down Bottom Up",
|
||||
33: "Duette Architella, Top Down Bottom Up",
|
||||
47: "Pleated, Top Down Bottom Up",
|
||||
}
|
||||
|
||||
OPEN_POSITION: Final[int] = 100
|
||||
@@ -93,13 +103,13 @@ class PowerViewBLE:
|
||||
],
|
||||
)
|
||||
self._data_event = asyncio.Event()
|
||||
self._data: bytearray
|
||||
self._data: bytes = b""
|
||||
self._info: PVDeviceInfo = PVDeviceInfo()
|
||||
self._cmd_lock: Final = asyncio.Lock()
|
||||
self._cmd_next = None
|
||||
self._is_encrypted: bool = False
|
||||
self._cipher: Final = (
|
||||
Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16)))
|
||||
self._cmd_lock: Final = asyncio.Lock()
|
||||
self._cmd_next: tuple[ShadeCmd, bytes]
|
||||
self._cipher: Final[Cipher | None] = (
|
||||
Cipher(algorithms.AES(home_key), modes.CTR(bytes(16)))
|
||||
if len(home_key) == 16
|
||||
else None
|
||||
)
|
||||
@@ -129,7 +139,7 @@ class PowerViewBLE:
|
||||
|
||||
# general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len
|
||||
async def _cmd(
|
||||
self, cmd: tuple[ShadeCmd, bytearray], disconnect: bool = True
|
||||
self, cmd: tuple[ShadeCmd, bytes], disconnect: bool = True
|
||||
) -> None:
|
||||
self._cmd_next = cmd
|
||||
if self._cmd_lock.locked():
|
||||
@@ -139,17 +149,15 @@ class PowerViewBLE:
|
||||
async with self._cmd_lock:
|
||||
try:
|
||||
await self._connect()
|
||||
cmd_run = self._cmd_next
|
||||
tx_data = (
|
||||
bytearray(
|
||||
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
|
||||
+ bytes([self._seqcnt, len(cmd_run[1])])
|
||||
)
|
||||
cmd_run: tuple[ShadeCmd, bytes] = self._cmd_next
|
||||
tx_data: bytes = bytes(
|
||||
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
|
||||
+ bytes([self._seqcnt, len(cmd_run[1])])
|
||||
+ cmd_run[1]
|
||||
)
|
||||
LOGGER.debug("sending cmd: %s", tx_data.hex(" "))
|
||||
if self._cipher is not None and self._is_encrypted:
|
||||
enc = self._cipher.encryptor()
|
||||
enc: AEADEncryptionContext = self._cipher.encryptor()
|
||||
tx_data = enc.update(tx_data) + enc.finalize()
|
||||
LOGGER.debug(" encrypted: %s", tx_data.hex(" "))
|
||||
self._data_event.clear()
|
||||
@@ -174,8 +182,8 @@ class PowerViewBLE:
|
||||
if len(data) != 9:
|
||||
LOGGER.debug("not a V2 record!")
|
||||
return []
|
||||
pos = int.from_bytes(data[3:5], byteorder="little")
|
||||
pos2 = (int(data[5]) << 4) + (int(data[4]) >> 4)
|
||||
pos: int = int.from_bytes(data[3:5], byteorder="little")
|
||||
pos2: int = (int(data[5]) << 4) + (int(data[4]) >> 4)
|
||||
return [
|
||||
(ATTR_CURRENT_POSITION, ((pos >> 2) / 10)),
|
||||
("position2", pos2 >> 2),
|
||||
@@ -198,7 +206,7 @@ class PowerViewBLE:
|
||||
await self._cmd(
|
||||
(
|
||||
ShadeCmd.SET_POSITION,
|
||||
bytearray(
|
||||
bytes(
|
||||
int.to_bytes(value * 100, 2, byteorder="little")
|
||||
+ bytes([0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x0])
|
||||
),
|
||||
@@ -214,7 +222,7 @@ class PowerViewBLE:
|
||||
async def stop(self) -> None:
|
||||
"""Stop device movement."""
|
||||
LOGGER.debug("%s stop", self.name)
|
||||
await self._cmd((ShadeCmd.STOP, bytearray()))
|
||||
await self._cmd((ShadeCmd.STOP, b""))
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Fully close cover."""
|
||||
@@ -230,19 +238,19 @@ class PowerViewBLE:
|
||||
await self._cmd(
|
||||
(
|
||||
ShadeCmd.ACTIVATE_SCENE,
|
||||
bytearray(int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2])),
|
||||
int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2]),
|
||||
),
|
||||
)
|
||||
|
||||
async def identify(self, beeps: int = 0x3) -> None:
|
||||
"""Identify device."""
|
||||
LOGGER.debug("%s identify (%i)", self.name, beeps)
|
||||
await self._cmd((ShadeCmd.IDENTIFY, bytearray([min(beeps, 0xFF)])))
|
||||
await self._cmd((ShadeCmd.IDENTIFY, bytes([min(beeps, 0xFF)])))
|
||||
|
||||
def _verify_response(self, data: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
|
||||
def _verify_response(self, data: bytes, seq_nr: int, cmd: ShadeCmd) -> bool:
|
||||
"""Verify shade response data."""
|
||||
if len(data) < 4:
|
||||
LOGGER.error("Reponse message too short")
|
||||
LOGGER.error("Response message too short")
|
||||
return False
|
||||
if int.from_bytes(data[0:2], byteorder="little") != cmd.value & 0xFFEF:
|
||||
LOGGER.warning("Response to wrong command")
|
||||
@@ -298,10 +306,10 @@ class PowerViewBLE:
|
||||
|
||||
def _notification_handler(self, _sender, data: bytearray) -> None:
|
||||
LOGGER.debug("%s received BLE data: %s", self.name, data.hex(" "))
|
||||
self._data = data
|
||||
self._data = bytes(data)
|
||||
if self._cipher is not None and self._is_encrypted:
|
||||
dec: CipherContext = self._cipher.decryptor()
|
||||
self._data = bytearray(dec.update(data) + dec.finalize())
|
||||
dec: AEADDecryptionContext = self._cipher.decryptor()
|
||||
self._data = bytes(dec.update(bytes(data)) + dec.finalize())
|
||||
LOGGER.debug("%s %s", "decoded data: ".rjust(19+len(self.name)), self._data.hex(" "))
|
||||
|
||||
self._data_event.set()
|
||||
|
||||
Reference in New Issue
Block a user