Initial commit
Basic functionality could work (untested!)
This commit is contained in:
128
custom_components/hunterdouglas_powerview_ble/api.py
Normal file
128
custom_components/hunterdouglas_powerview_ble/api.py
Normal file
@@ -0,0 +1,128 @@
|
||||
"""Hunter Douglas PowerView BLE API."""
|
||||
|
||||
import asyncio
|
||||
from enum import Enum
|
||||
|
||||
from bleak import BleakClient
|
||||
from bleak.backends.device import BLEDevice
|
||||
from bleak.exc import BleakError
|
||||
|
||||
from .const import LOGGER
|
||||
|
||||
|
||||
class shade_cmd(Enum):
|
||||
"""The PowerView cover commands."""
|
||||
|
||||
set_position = 0x01F7
|
||||
stop = 0xB8F7
|
||||
activate_scene = 0xBAF7
|
||||
|
||||
|
||||
class PowerViewBLE:
|
||||
"""Class to handle connection to PowerView remote device."""
|
||||
|
||||
UUID_SERVICE = "0000fdc1-0000-1000-8000-00805f9b34fb"
|
||||
UUID_TX = "cafe1001-c0ff-ee01-8000-A110CA7AB1E0"
|
||||
|
||||
def __init__(self, ble_device: BLEDevice) -> None:
|
||||
"""Initialize device API via Bluetooth."""
|
||||
self._ble_device: BLEDevice = ble_device
|
||||
self.name = self._ble_device.name
|
||||
self.seqcnt: int = 0
|
||||
self._client: BleakClient | None = None
|
||||
self._data_event = asyncio.Event()
|
||||
|
||||
async def _wait_event(self) -> None:
|
||||
await self._data_event.wait()
|
||||
self._data_event.clear()
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
"""Return whether remote device is connected."""
|
||||
return self._client is not None and self._client.is_connected
|
||||
|
||||
# general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len
|
||||
async def _cmd(self, cmd: shade_cmd, data: bytearray) -> None:
|
||||
try:
|
||||
await self._connect()
|
||||
assert self._client is not None, "missing BT client"
|
||||
tx_data = (
|
||||
bytearray(
|
||||
int.to_bytes(cmd.value, 2, byteorder="little")
|
||||
+ bytes([self.seqcnt, len(data)])
|
||||
)
|
||||
+ data
|
||||
)
|
||||
self.seqcnt += 1
|
||||
await self._client.write_gatt_char(self.UUID_TX, tx_data, False)
|
||||
finally:
|
||||
await self._disconnect()
|
||||
|
||||
# position cmd: uint16_t pos1, uint16_t pos2, uint16_t pos3, uint16_t tilt, uint8_t velocity
|
||||
async def set_position(self, value: int) -> None:
|
||||
"""Set position of device."""
|
||||
LOGGER.debug("%s setting position to %i", self.name, value)
|
||||
await self._cmd(
|
||||
shade_cmd.set_position,
|
||||
bytearray(
|
||||
int.to_bytes(value * 100, 2, byteorder="little")
|
||||
+ bytes([0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x0])
|
||||
),
|
||||
)
|
||||
|
||||
# TODO: currently HA / Bleak communication is too slow to make that function reasonable
|
||||
# def stop(self) -> None:
|
||||
# """Stop device movement."""
|
||||
# LOGGER.debug("%s stop")
|
||||
|
||||
# uint8_t scene#, uint8_t unknown
|
||||
# open: scene 2
|
||||
# close: scene 3
|
||||
async def activate_scene(self, idx: int) -> None:
|
||||
"""Stop device movement."""
|
||||
LOGGER.debug("%s set scene #%i", self.name, idx)
|
||||
await self._cmd(
|
||||
shade_cmd.activate_scene,
|
||||
bytearray(
|
||||
int.to_bytes(idx, 1, byteorder="little")
|
||||
+ bytes([0xA2])
|
||||
),
|
||||
)
|
||||
|
||||
def _on_disconnect(self, client: BleakClient) -> None:
|
||||
"""Disconnect callback function."""
|
||||
|
||||
LOGGER.debug("Disconnected from %s", client.address)
|
||||
|
||||
def _notification_handler(self, sender, data: bytearray) -> None:
|
||||
LOGGER.debug("%s received BLE data: %s", self.name, data)
|
||||
|
||||
self._data_event.set()
|
||||
|
||||
async def _connect(self) -> None:
|
||||
"""Connect to the device and setup notification if not connected."""
|
||||
|
||||
if not self.is_connected:
|
||||
LOGGER.debug("Connecting %s", self.name)
|
||||
self._client = BleakClient(
|
||||
self._ble_device,
|
||||
disconnected_callback=self._on_disconnect,
|
||||
services=[self.UUID_SERVICE],
|
||||
)
|
||||
await self._client.connect()
|
||||
await self._client.start_notify(self.UUID_TX, self._notification_handler)
|
||||
else:
|
||||
LOGGER.debug("%s already connected", self.name)
|
||||
|
||||
async def _disconnect(self) -> None:
|
||||
"""Disconnect the device and stop notifications."""
|
||||
|
||||
if self._client and self.is_connected:
|
||||
LOGGER.debug("Disconnecting device %s", self.name)
|
||||
try:
|
||||
self._data_event.clear()
|
||||
await self._client.disconnect()
|
||||
except BleakError:
|
||||
LOGGER.warning("Disconnect failed!")
|
||||
|
||||
self._client = None
|
||||
Reference in New Issue
Block a user