support identification

This commit is contained in:
patman15
2024-12-30 17:20:11 +01:00
parent ba487d907d
commit 591815652d
3 changed files with 91 additions and 11 deletions

View File

@@ -12,6 +12,7 @@ from bleak.exc import BleakError
from bleak.uuids import normalize_uuid_str
from bleak_retry_connector import establish_connection
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.ciphers.base import CipherContext
from homeassistant.components.cover import ATTR_CURRENT_POSITION
from .const import LOGGER, TIMEOUT
@@ -58,6 +59,7 @@ class ShadeCmd(Enum):
SET_POSITION = 0x01F7
STOP = 0xB8F7
ACTIVATE_SCENE = 0xBAF7
IDENTIFY = 0x11F7
@dataclass
@@ -112,7 +114,7 @@ class PowerViewBLE:
return self._is_encrypted
@encrypted.setter
def encrypted(self, value:bool) -> None:
def encrypted(self, value: bool) -> None:
self._is_encrypted = value
@property
@@ -212,7 +214,7 @@ class PowerViewBLE:
async def stop(self) -> None:
"""Stop device movement."""
LOGGER.debug("%s stop", self.name)
await self._cmd((ShadeCmd.STOP, bytearray(b"")))
await self._cmd((ShadeCmd.STOP, bytearray()))
async def close(self) -> None:
"""Fully close cover."""
@@ -232,12 +234,13 @@ class PowerViewBLE:
),
)
def _verify_response(self, din: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
async def identify(self, beeps: int = 0x3) -> None:
"""Identify device."""
LOGGER.debug("%s identify (%i)", self.name, beeps)
await self._cmd((ShadeCmd.IDENTIFY, bytearray([min(beeps, 0xFF)])))
def _verify_response(self, data: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
"""Verify shade response data."""
data: bytearray = din
if self._cipher is not None and self._is_encrypted:
dec = self._cipher.decryptor()
data = bytearray(dec.update(din) + dec.finalize())
if len(data) < 4:
LOGGER.error("Reponse message too short")
return False
@@ -253,7 +256,7 @@ class PowerViewBLE:
LOGGER.error("Wrong response data length")
return False
if int(data[4] != 0):
LOGGER.error("Command %d returned error #%d", cmd.value, int(data[4]))
LOGGER.error("Command %X returned error #%d", cmd.value, int(data[4]))
return False
return True
@@ -291,8 +294,13 @@ class PowerViewBLE:
LOGGER.debug("Disconnected from %s", client.address)
def _notification_handler(self, _sender, data: bytearray) -> None:
LOGGER.debug("%s received BLE data: %s", self.name, data)
LOGGER.debug("%s received BLE data: %s", self.name, data.hex(" "))
self._data = data
if self._cipher is not None and self._is_encrypted:
dec: CipherContext = self._cipher.decryptor()
self._data = bytearray(dec.update(data) + dec.finalize())
LOGGER.debug("%s %s", "decoded data: ".rjust(19+len(self.name)), self._data.hex(" "))
self._data_event.set()
async def _connect(self) -> None:
@@ -304,7 +312,7 @@ class PowerViewBLE:
LOGGER.debug("%s already connected", self.name)
return
start = time.time()
start: float = time.time()
self._client = await establish_connection(
BleakClient,
self._ble_device,