made device disconnect optional

This commit is contained in:
patman15
2024-08-29 22:18:11 +02:00
parent 2a800f1db3
commit ef18509ca5
2 changed files with 52 additions and 29 deletions

View File

@@ -41,8 +41,8 @@ SHADE_TYPE: Final[dict[int, str]] = {
84: "Vignette",
}
OPEN_POSITION: Final = 100
CLOSED_POSITION: Final = 0
OPEN_POSITION: Final[int] = 100
CLOSED_POSITION: Final[int] = 0
POWER_LEVELS: Final[dict[int, int]] = {
4: 100, # 4 is hardwired
@@ -112,10 +112,12 @@ class PowerViewBLE:
return self._client is not None and self._client.is_connected
# general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len
async def _cmd(self, cmd: ShadeCmd, data: bytearray) -> None:
async def _cmd(
self, cmd: tuple[ShadeCmd, bytearray], disconnect: bool = False
) -> None:
self._cmd_next = cmd
if self._cmd_lock.locked():
LOGGER.debug("%s: device busy, queing %s", self.name, cmd.name)
LOGGER.debug("%s: device busy, queuing %s command", self.name, cmd[0])
return
async with self._cmd_lock:
@@ -125,10 +127,10 @@ class PowerViewBLE:
cmd_run = self._cmd_next
tx_data = (
bytearray(
int.to_bytes(cmd_run.value, 2, byteorder="little")
+ bytes([self._seqcnt, len(data)])
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
+ bytes([self._seqcnt, len(cmd_run[1])])
)
+ data
+ cmd_run[1]
)
if self._cipher is not None:
enc = self._cipher.encryptor()
@@ -140,13 +142,14 @@ class PowerViewBLE:
LOGGER.debug("waiting for response")
try:
await asyncio.wait_for(self._wait_event(), timeout=TIMEOUT)
self._verify_response(self._data, self._seqcnt - 1, cmd_run)
self._verify_response(self._data, self._seqcnt - 1, cmd_run[0])
except TimeoutError as ex:
raise TimeoutError("Device did not send confirmation.") from ex
# finally:
# await self._client.disconnect() # device disconnects itself
finally:
if disconnect:
await self._client.disconnect() # device disconnects itself
except Exception as ex:
LOGGER.debug("Error: %s - %s", type(ex).__name__, ex)
LOGGER.error("Error: %s - %s", type(ex).__name__, ex)
raise
@staticmethod
@@ -169,31 +172,49 @@ class PowerViewBLE:
]
# position cmd: uint16_t pos1, uint16_t pos2, uint16_t pos3, uint16_t tilt, uint8_t velocity
async def set_position(self, value: int) -> None:
"""Set position of device."""
LOGGER.debug("%s setting position to %i", self.name, value)
async def _set_position(self, value: int, disconnect: bool = False) -> None:
await self._cmd(
(
ShadeCmd.SET_POSITION,
bytearray(
int.to_bytes(value * 100, 2, byteorder="little")
+ bytes([0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x0])
),
),
disconnect,
)
async def set_position(self, value: int) -> None:
"""Set position of device."""
LOGGER.debug("%s setting position to %i", self.name, value)
await self._set_position(value, disconnect=True)
async def open(self) -> None:
"""Fully open cover."""
LOGGER.debug("%s open", self.name)
await self._set_position(OPEN_POSITION)
async def stop(self) -> None:
"""Stop device movement."""
LOGGER.debug("%s stop", self.name)
await self._cmd(ShadeCmd.STOP, bytearray(b""))
await self._cmd((ShadeCmd.STOP, bytearray(b"")))
async def close(self) -> None:
"""Fully close cover."""
LOGGER.debug("%s close", self.name)
await self._set_position(CLOSED_POSITION)
# uint8_t scene#, uint8_t unknown
# open: scene 2 (programmed by app?)
# close: scene 3 (programmed by app?)
# open: scene 2
# close: scene 3
async def activate_scene(self, idx: int) -> None:
"""Activate stored scene."""
LOGGER.debug("%s set scene #%i", self.name, idx)
await self._cmd(
(
ShadeCmd.ACTIVATE_SCENE,
bytearray(int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2])),
),
)
def _verify_response(self, input: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
@@ -203,19 +224,21 @@ class PowerViewBLE:
dec = self._cipher.decryptor()
data = dec.update(input) + dec.finalize()
if len(data) < 4:
LOGGER.warning("Message too short")
LOGGER.error("Reponse message too short")
return False
if int.from_bytes(data[0:2], byteorder="little") != cmd.value & 0xFFEF:
LOGGER.warning("Response to wrong command")
return False
if int(data[2]) != seq_nr:
LOGGER.warning("Wrong sequence number")
LOGGER.warning(
f"Response sequence id {int(data[2])} wrong, expected {seq_nr}"
)
return False
if int(data[3]) != 1:
LOGGER.warning("Wrong response data length")
LOGGER.error("Wrong response data length")
return False
if int(data[4] != 0):
LOGGER.warning("Return code type error")
LOGGER.error(f"Command {cmd.value} returned error #{int(data[4])}")
return False
return True
@@ -271,7 +294,7 @@ class PowerViewBLE:
start = time.time()
await close_stale_connections(self._ble_device)
LOGGER.debug("%s: closing stale connections took %is", self.name, time.time()-start)
self._client = await establish_connection(
BleakClient,
self._ble_device,

View File

@@ -16,7 +16,7 @@ from typing import Final
DOMAIN: Final = "hunterdouglas_powerview_ble"
LOGGER: Final = logging.getLogger(__package__)
MFCT_ID: Final = 2073
TIMEOUT: Final = 15
TIMEOUT: Final = 5
HOME_KEY: Final = b""