stronger typing

This commit is contained in:
patman15
2024-12-22 13:35:29 +01:00
parent b550f98e2b
commit b6b7e43b02
2 changed files with 27 additions and 17 deletions

View File

@@ -12,6 +12,10 @@ from bleak.exc import BleakError
from bleak.uuids import normalize_uuid_str
from bleak_retry_connector import establish_connection
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.ciphers.base import (
AEADDecryptionContext,
AEADEncryptionContext,
)
from homeassistant.components.cover import ATTR_CURRENT_POSITION
from .const import LOGGER, TIMEOUT
@@ -25,6 +29,7 @@ ATTR_ACTIVITY: Final[str] = "activity"
SHADE_TYPE: Final[dict[int, str]] = {
# up down only
1: "Designer Roller",
4: "Roman",
5: "Bottom Up",
@@ -38,6 +43,11 @@ SHADE_TYPE: Final[dict[int, str]] = {
52: "Banded Shades",
53: "Sonnette",
84: "Vignette",
# top down bottom up
8: "Duette, Top Down Bottom Up",
9: "Duette DuoLite, Top Down Bottom Up",
33: "Duette Architella, Top Down Bottom Up",
47: "Pleated, Top Down Bottom Up",
}
OPEN_POSITION: Final[int] = 100
@@ -91,11 +101,11 @@ class PowerViewBLE:
],
)
self._data_event = asyncio.Event()
self._data: bytearray
self._data: bytearray = bytearray()
self._info: PVDeviceInfo = PVDeviceInfo()
self._cmd_lock: Final = asyncio.Lock()
self._cmd_next = None
self._cipher: Final = (
self._cmd_next: tuple[ShadeCmd, bytes]
self._cipher: Final[Cipher | None] = (
Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16)))
if len(home_key) == 16
else None
@@ -117,7 +127,7 @@ class PowerViewBLE:
# general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len
async def _cmd(
self, cmd: tuple[ShadeCmd, bytearray], disconnect: bool = True
self, cmd: tuple[ShadeCmd, bytes], disconnect: bool = True
) -> None:
self._cmd_next = cmd
if self._cmd_lock.locked():
@@ -127,16 +137,14 @@ class PowerViewBLE:
async with self._cmd_lock:
try:
await self._connect()
cmd_run = self._cmd_next
tx_data = (
bytearray(
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
+ bytes([self._seqcnt, len(cmd_run[1])])
)
cmd_run: tuple[ShadeCmd, bytes] = self._cmd_next
tx_data: bytes = bytes(
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
+ bytes([self._seqcnt, len(cmd_run[1])])
+ cmd_run[1]
)
if self._cipher is not None:
enc = self._cipher.encryptor()
enc: AEADEncryptionContext = self._cipher.encryptor()
tx_data = enc.update(tx_data) + enc.finalize()
self._data_event.clear()
LOGGER.debug("sending cmd: %s", tx_data)
@@ -161,8 +169,8 @@ class PowerViewBLE:
if len(data) != 9:
LOGGER.debug("not a V2 record!")
return []
pos = int.from_bytes(data[3:5], byteorder="little")
pos2 = (int(data[5]) << 4) + (int(data[4]) >> 4)
pos: int = int.from_bytes(data[3:5], byteorder="little")
pos2: int = (int(data[5]) << 4) + (int(data[4]) >> 4)
return [
(ATTR_CURRENT_POSITION, ((pos >> 2) / 10)),
("position2", pos2 >> 2),
@@ -185,7 +193,7 @@ class PowerViewBLE:
await self._cmd(
(
ShadeCmd.SET_POSITION,
bytearray(
bytes(
int.to_bytes(value * 100, 2, byteorder="little")
+ bytes([0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x0])
),
@@ -201,7 +209,7 @@ class PowerViewBLE:
async def stop(self) -> None:
"""Stop device movement."""
LOGGER.debug("%s stop", self.name)
await self._cmd((ShadeCmd.STOP, bytearray(b"")))
await self._cmd((ShadeCmd.STOP, b""))
async def close(self) -> None:
"""Fully close cover."""
@@ -217,7 +225,7 @@ class PowerViewBLE:
await self._cmd(
(
ShadeCmd.ACTIVATE_SCENE,
bytearray(int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2])),
int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2]),
),
)
@@ -225,7 +233,7 @@ class PowerViewBLE:
"""Verify shade response data."""
data: bytearray = din
if self._cipher is not None:
dec = self._cipher.decryptor()
dec: AEADDecryptionContext = self._cipher.decryptor()
data = bytearray(dec.update(din) + dec.finalize())
if len(data) < 4:
LOGGER.error("Reponse message too short")