stricter ruff formatting
This commit is contained in:
@@ -1,28 +1,27 @@
|
||||
"""Hunter Douglas PowerView BLE API."""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
import time
|
||||
from typing import Final
|
||||
|
||||
from bleak import BleakClient
|
||||
from bleak.backends.device import BLEDevice
|
||||
from bleak.exc import BleakError
|
||||
from bleak.uuids import normalize_uuid_str
|
||||
from bleak_retry_connector import close_stale_connections, establish_connection
|
||||
from bleak_retry_connector import establish_connection
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
|
||||
from homeassistant.components.cover import ATTR_CURRENT_POSITION
|
||||
|
||||
from .const import LOGGER, TIMEOUT
|
||||
|
||||
UUID_COV_SERVICE: Final = normalize_uuid_str("fdc1")
|
||||
UUID_TX: Final = "cafe1001-c0ff-ee01-8000-a110ca7ab1e0"
|
||||
UUID_DEV_SERVICE: Final = normalize_uuid_str("180a")
|
||||
UUID_BAT_SERVICE: Final = normalize_uuid_str("180f")
|
||||
UUID_COV_SERVICE: Final[str] = normalize_uuid_str("fdc1")
|
||||
UUID_TX: Final[str] = "cafe1001-c0ff-ee01-8000-a110ca7ab1e0"
|
||||
UUID_DEV_SERVICE: Final[str] = normalize_uuid_str("180a")
|
||||
UUID_BAT_SERVICE: Final[str] = normalize_uuid_str("180f")
|
||||
|
||||
ATTR_ACTIVITY: Final = "activity"
|
||||
ATTR_ACTIVITY: Final[str] = "activity"
|
||||
|
||||
|
||||
SHADE_TYPE: Final[dict[int, str]] = {
|
||||
@@ -82,7 +81,15 @@ class PowerViewBLE:
|
||||
self._ble_device: Final[BLEDevice] = ble_device
|
||||
self.name: Final[str] = self._ble_device.name or "unknown"
|
||||
self._seqcnt: int = 1
|
||||
self._client: BleakClient | None = None
|
||||
self._client: BleakClient = BleakClient(
|
||||
self._ble_device,
|
||||
disconnected_callback=self._on_disconnect,
|
||||
services=[
|
||||
UUID_COV_SERVICE,
|
||||
# self.UUID_DEV_SERVICE,
|
||||
# self.UUID_BAT_SERVICE,
|
||||
],
|
||||
)
|
||||
self._data_event = asyncio.Event()
|
||||
self._data: bytearray
|
||||
self._info: PVDeviceInfo = PVDeviceInfo()
|
||||
@@ -106,7 +113,7 @@ class PowerViewBLE:
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
"""Return whether remote device is connected."""
|
||||
return self._client is not None and self._client.is_connected
|
||||
return self._client.is_connected
|
||||
|
||||
# general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len
|
||||
async def _cmd(
|
||||
@@ -120,7 +127,6 @@ class PowerViewBLE:
|
||||
async with self._cmd_lock:
|
||||
try:
|
||||
await self._connect()
|
||||
assert self._client is not None, "missing BT client"
|
||||
cmd_run = self._cmd_next
|
||||
tx_data = (
|
||||
bytearray(
|
||||
@@ -156,7 +162,7 @@ class PowerViewBLE:
|
||||
LOGGER.debug("not a V2 record!")
|
||||
return []
|
||||
pos = int.from_bytes(data[3:5], byteorder="little")
|
||||
pos2 = ((int(data[5]) << 4) + (int(data[4]) >> 4))
|
||||
pos2 = (int(data[5]) << 4) + (int(data[4]) >> 4)
|
||||
return [
|
||||
(ATTR_CURRENT_POSITION, ((pos >> 2) / 10)),
|
||||
("position2", pos2 >> 2),
|
||||
@@ -215,12 +221,12 @@ class PowerViewBLE:
|
||||
),
|
||||
)
|
||||
|
||||
def _verify_response(self, input: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
|
||||
def _verify_response(self, din: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
|
||||
"""Verify shade response data."""
|
||||
data = input
|
||||
data: bytearray = din
|
||||
if self._cipher is not None:
|
||||
dec = self._cipher.decryptor()
|
||||
data = dec.update(input) + dec.finalize()
|
||||
data = bytearray(dec.update(din) + dec.finalize())
|
||||
if len(data) < 4:
|
||||
LOGGER.error("Reponse message too short")
|
||||
return False
|
||||
@@ -229,14 +235,14 @@ class PowerViewBLE:
|
||||
return False
|
||||
if int(data[2]) != seq_nr:
|
||||
LOGGER.warning(
|
||||
f"Response sequence id {int(data[2])} wrong, expected {seq_nr}"
|
||||
"Response sequence id %i wrong, expected %d", int(data[2]), seq_nr
|
||||
)
|
||||
return False
|
||||
if int(data[3]) != 1:
|
||||
LOGGER.error("Wrong response data length")
|
||||
return False
|
||||
if int(data[4] != 0):
|
||||
LOGGER.error(f"Command {cmd.value} returned error #{int(data[4])}")
|
||||
LOGGER.error("Command %d returned error #%d", cmd.value, int(data[4]))
|
||||
return False
|
||||
return True
|
||||
|
||||
@@ -255,7 +261,6 @@ class PowerViewBLE:
|
||||
async with self._cmd_lock:
|
||||
try:
|
||||
await self._connect()
|
||||
assert self._client is not None
|
||||
|
||||
for key, uuid in uuids.items():
|
||||
LOGGER.debug("querying %s(%s)", key, uuid)
|
||||
@@ -265,7 +270,7 @@ class PowerViewBLE:
|
||||
.decode("UTF-8")
|
||||
)
|
||||
finally:
|
||||
await self._disconnect()
|
||||
await self.disconnect()
|
||||
LOGGER.debug("%s device data: %s", self.name, data)
|
||||
return data.copy()
|
||||
|
||||
@@ -289,8 +294,6 @@ class PowerViewBLE:
|
||||
return
|
||||
|
||||
start = time.time()
|
||||
await close_stale_connections(self._ble_device)
|
||||
|
||||
self._client = await establish_connection(
|
||||
BleakClient,
|
||||
self._ble_device,
|
||||
@@ -308,10 +311,10 @@ class PowerViewBLE:
|
||||
|
||||
# await self._query_dev_info()
|
||||
|
||||
async def _disconnect(self) -> None:
|
||||
async def disconnect(self) -> None:
|
||||
"""Disconnect the device and stop notifications."""
|
||||
|
||||
if self._client is not None and self.is_connected:
|
||||
if self.is_connected:
|
||||
LOGGER.debug("Disconnecting device %s", self.name)
|
||||
try:
|
||||
self._data_event.clear()
|
||||
|
||||
Reference in New Issue
Block a user