Compare commits
37 Commits
feature/ty
...
improve-co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f260416676 | ||
|
|
abb0a3e8a3 | ||
|
|
652337e32c | ||
|
|
af08d18d62 | ||
|
|
894580c20b | ||
|
|
317b450702 | ||
|
|
31185a4446 | ||
|
|
04c7036351 | ||
|
|
d66cf61887 | ||
|
|
5d498b8753 | ||
|
|
b558083b50 | ||
|
|
3775496936 | ||
|
|
883aca753e | ||
|
|
fe3646df27 | ||
|
|
bae4158a3c | ||
|
|
d9ebc54026 | ||
|
|
62bbfd7361 | ||
|
|
30dff09bb1 | ||
|
|
74d906151e | ||
|
|
9773e5df65 | ||
|
|
b2d5335e1d | ||
|
|
a6aaf4d727 | ||
|
|
f2ad61a016 | ||
|
|
f6ec17c9b2 | ||
|
|
d79096357d | ||
|
|
52f7390fc0 | ||
|
|
c9a27388af | ||
|
|
1590647c9e | ||
|
|
1559776aa8 | ||
|
|
2c03881b60 | ||
|
|
0bc5644883 | ||
|
|
e97eef94f8 | ||
|
|
4b51ea514f | ||
|
|
591815652d | ||
|
|
ba487d907d | ||
|
|
8b0bccee6b | ||
|
|
054f35f838 |
16
.github/workflows/lint.yml
vendored
16
.github/workflows/lint.yml
vendored
@@ -8,8 +8,8 @@ on:
|
||||
- cron: '0 5 * * 6'
|
||||
|
||||
jobs:
|
||||
ruff:
|
||||
name: "Ruff"
|
||||
lint:
|
||||
name: "Lint the code"
|
||||
runs-on: "ubuntu-latest"
|
||||
steps:
|
||||
- name: "Checkout the repository"
|
||||
@@ -18,11 +18,17 @@ jobs:
|
||||
- name: "Set up Python"
|
||||
uses: actions/setup-python@main
|
||||
with:
|
||||
python-version: "3.12"
|
||||
python-version: "3.13.2"
|
||||
cache: "pip"
|
||||
|
||||
- name: "Install requirements"
|
||||
run: python3 -m pip install -r requirements.txt
|
||||
|
||||
- name: "Run Ruff"
|
||||
run: python3 -m ruff check .
|
||||
- name: "Run ruff"
|
||||
run: ruff check .
|
||||
|
||||
- name: "Run mypy"
|
||||
run: mypy .
|
||||
|
||||
- name: "Run codespell"
|
||||
run: codespell -L hass
|
||||
39
README.md
39
README.md
@@ -8,8 +8,7 @@
|
||||
> [!WARNING]
|
||||
> - This integration is under development!
|
||||
> - Test coverage is low, malfunction might occur.
|
||||
> - Only devices that are **not** added to the app are controlable. It is possible to add them to the app if you just want to monitor the status (position, battery) in Home Assistant.
|
||||
> - Currently only position change is supported (e.g., no tilt)
|
||||
> - The HOME_KEY is lost over updates!
|
||||
|
||||
## Features
|
||||
- Zero configuration
|
||||
@@ -18,7 +17,7 @@
|
||||
### Supported Devices
|
||||
|
||||
Type* | Description
|
||||
-- | --
|
||||
-- | --
|
||||
1 | Designer Roller
|
||||
4 | Roman
|
||||
5 | Bottom Up
|
||||
@@ -26,6 +25,7 @@ Type* | Description
|
||||
10 | Duette and Applause SkyLift
|
||||
19 | Provenance Woven Wood
|
||||
31, 32, 84 | Vignette
|
||||
39 | Parkland
|
||||
42 | M25T Roller Blind
|
||||
49 | AC Roller
|
||||
52 | Banded Shades
|
||||
@@ -39,10 +39,14 @@ The integration provides the following information about the battery
|
||||
Platform | Description | Unit | Details
|
||||
-- | -- | -- | --
|
||||
`binary_sensor` | battery charging indicator | `bool` | true if battery is charging
|
||||
`button` | identify shade | - | identify shade by LED and 3 beeps
|
||||
`cover` | view/control position | `%` | percentage cover is open (100% is open)
|
||||
`sensor` | SoC (state of charge) | `%` | range 100% (full), 50%, 20%, 0% (battery empty)
|
||||
|
||||
## Installation
|
||||
> [!IMPORTANT]
|
||||
> In case you added your shades to the app or a gateway, you need to [set the encryption key](#set-the-encryption-key) manually in the [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py) file after **each** update!
|
||||
|
||||
### Automatic
|
||||
Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom repository](https://hacs.xyz/docs/faq/custom_repositories/).
|
||||
|
||||
@@ -57,12 +61,22 @@ Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom rep
|
||||
1. Restart Home Assistant
|
||||
1. In the HA UI go to "Configuration" -> "Integrations" click "+" and search for "Hunter Douglas PowerView (BLE)"
|
||||
|
||||
## Set the Encryption Key
|
||||
Currently, there are three methods to obtain the key:
|
||||
|
||||
## Outlook
|
||||
- Add support for encryption
|
||||
- Allow parallel usage to PowerView app as "remote"
|
||||
- Add support for tilt function
|
||||
- Add support for further device types
|
||||
1. Via adopting a BLE shade: There is a [shade emulator](/emu/PV_BLE_cover) that works with Arduino IDE and an ESP32 device (≥ 2MiB flash, ≥ 128KiB required), e.g. [Adafruit QT Py ESP32-S3](https://www.adafruit.com/product/5426). Install and connect via serial port, then go to the PowerView app and add the shade `myPVcover` to your home. You will see a log message `set shade key: \xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx` . Copy this key. You can delete the shade from the app when done.
|
||||
2. Extracting from gateway: This [script](scripts/extract_gateway3_homekey.py) is able to extract the key from a working PowerView gateway.
|
||||
3. Grabbing from the app: Checkout this [post in the Home Assistant community forum](https://community.home-assistant.io/t/hunter-douglas-powerview-gen-3-integration/424836/228).
|
||||
|
||||
Finally, you need to manually copy the key to [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py).
|
||||
|
||||
> [!IMPORTANT]
|
||||
> You need to update the file after **each** update!
|
||||
|
||||
## Known Issues
|
||||
<details><summary>Shade inoperable after charging</summary>
|
||||
It seems that the shades require some re-initialization after charging. The solution is currently unknown, but as a workaround you can operate the shade ones using the vendor app.
|
||||
</details>
|
||||
|
||||
## Troubleshooting
|
||||
In case you have severe troubles,
|
||||
@@ -72,6 +86,15 @@ In case you have severe troubles,
|
||||
- disable the log (Home Assistant will prompt you to download the log), and finally
|
||||
- [open an issue](https://github.com/patman15/hdpv_ble/issues/new?assignees=&labels=Bug&projects=&template=bug.yml) with a good description of what happened and attach the log.
|
||||
|
||||
# Thanks To
|
||||
[@mannkind](https://github.com/mannkind)
|
||||
|
||||
[license-shield]: https://img.shields.io/github/license/patman15/hdpv_ble.svg?style=for-the-badge
|
||||
[releases-shield]: https://img.shields.io/github/release/patman15/hdpv_ble.svg?style=for-the-badge
|
||||
[releases]: https://github.com//patman15/hdpv_ble/releases
|
||||
|
||||
## Outlook
|
||||
- Add tests!
|
||||
- Allow parallel usage to PowerView app as "remote"
|
||||
- Add support for tilt function
|
||||
- Add support for further device types
|
||||
|
||||
@@ -4,17 +4,24 @@
|
||||
@license: Apache-2.0 license
|
||||
"""
|
||||
|
||||
from bleak.backends.device import BLEDevice
|
||||
from bleak.exc import BleakError
|
||||
|
||||
from homeassistant.components.bluetooth import async_ble_device_from_address
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import Platform
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
|
||||
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .const import LOGGER
|
||||
from .coordinator import PVCoordinator
|
||||
|
||||
PLATFORMS: list[Platform] = [Platform.BINARY_SENSOR, Platform.COVER, Platform.SENSOR]
|
||||
PLATFORMS: list[Platform] = [
|
||||
Platform.BINARY_SENSOR,
|
||||
Platform.COVER,
|
||||
Platform.SENSOR,
|
||||
Platform.BUTTON,
|
||||
]
|
||||
|
||||
type ConfigEntryType = ConfigEntry[PVCoordinator]
|
||||
|
||||
@@ -26,7 +33,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntryType) -> bool
|
||||
if entry.unique_id is None:
|
||||
raise ConfigEntryError("Missing unique ID for device.")
|
||||
|
||||
ble_device = async_ble_device_from_address(
|
||||
ble_device: BLEDevice | None = async_ble_device_from_address(
|
||||
hass=hass, address=entry.unique_id, connectable=True
|
||||
)
|
||||
|
||||
@@ -35,14 +42,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntryType) -> bool
|
||||
f"Could not find PowerView device ({entry.unique_id}) via Bluetooth"
|
||||
)
|
||||
|
||||
coordinator = PVCoordinator(hass, ble_device, entry.data.copy())
|
||||
coordinator = PVCoordinator(hass, ble_device, entry.data.copy(), entry.title)
|
||||
try:
|
||||
await coordinator.query_dev_info()
|
||||
except BleakError as err:
|
||||
raise ConfigEntryNotReady("Unable to query device info.") from err
|
||||
|
||||
# Insert the coordinator in the global registry
|
||||
hass.data.setdefault(DOMAIN, {})
|
||||
entry.runtime_data = coordinator
|
||||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||
entry.async_on_unload(coordinator.async_start())
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
"""Hunter Douglas PowerView BLE API."""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
import time
|
||||
from typing import Final
|
||||
|
||||
from bleak import BleakClient
|
||||
@@ -12,7 +12,15 @@ from bleak.exc import BleakError
|
||||
from bleak.uuids import normalize_uuid_str
|
||||
from bleak_retry_connector import establish_connection
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from homeassistant.components.cover import ATTR_CURRENT_POSITION
|
||||
from cryptography.hazmat.primitives.ciphers.base import (
|
||||
AEADDecryptionContext,
|
||||
AEADEncryptionContext,
|
||||
)
|
||||
|
||||
from homeassistant.components.cover import (
|
||||
ATTR_CURRENT_POSITION,
|
||||
ATTR_CURRENT_TILT_POSITION,
|
||||
)
|
||||
|
||||
from .const import LOGGER, TIMEOUT
|
||||
|
||||
@@ -25,6 +33,7 @@ ATTR_ACTIVITY: Final[str] = "activity"
|
||||
|
||||
|
||||
SHADE_TYPE: Final[dict[int, str]] = {
|
||||
# up down only
|
||||
1: "Designer Roller",
|
||||
4: "Roman",
|
||||
5: "Bottom Up",
|
||||
@@ -38,6 +47,15 @@ SHADE_TYPE: Final[dict[int, str]] = {
|
||||
52: "Banded Shades",
|
||||
53: "Sonnette",
|
||||
84: "Vignette",
|
||||
# top down bottom up
|
||||
8: "Duette, Top Down Bottom Up",
|
||||
9: "Duette DuoLite, Top Down Bottom Up",
|
||||
33: "Duette Architella, Top Down Bottom Up",
|
||||
39: "Parkland",
|
||||
47: "Pleated, Top Down Bottom Up",
|
||||
# top down, tilt anywhere
|
||||
51: "Venetian, Tilt Anywhere",
|
||||
62: "Venetian, Tilt Anywhere",
|
||||
}
|
||||
|
||||
OPEN_POSITION: Final[int] = 100
|
||||
@@ -58,6 +76,7 @@ class ShadeCmd(Enum):
|
||||
SET_POSITION = 0x01F7
|
||||
STOP = 0xB8F7
|
||||
ACTIVATE_SCENE = 0xBAF7
|
||||
IDENTIFY = 0x11F7
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -78,26 +97,18 @@ class PowerViewBLE:
|
||||
|
||||
def __init__(self, ble_device: BLEDevice, home_key: bytes = b"") -> None:
|
||||
"""Initialize device API via Bluetooth."""
|
||||
self._ble_device: Final[BLEDevice] = ble_device
|
||||
self._ble_device: BLEDevice = ble_device
|
||||
self.name: Final[str] = self._ble_device.name or "unknown"
|
||||
self._seqcnt: int = 1
|
||||
self._client: BleakClient = BleakClient(
|
||||
self._ble_device,
|
||||
disconnected_callback=self._on_disconnect,
|
||||
services=[
|
||||
UUID_COV_SERVICE,
|
||||
# self.UUID_DEV_SERVICE,
|
||||
# self.UUID_BAT_SERVICE,
|
||||
],
|
||||
)
|
||||
self._client: BleakClient = BleakClient(self._ble_device)
|
||||
self._data_event = asyncio.Event()
|
||||
self._data: bytearray
|
||||
self._data: bytes = b""
|
||||
self._info: PVDeviceInfo = PVDeviceInfo()
|
||||
self._cmd_lock: Final = asyncio.Lock()
|
||||
self._cmd_next = None
|
||||
self._is_encrypted: bool = False
|
||||
self._cipher: Final = (
|
||||
Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16)))
|
||||
self._cmd_lock: Final = asyncio.Lock()
|
||||
self._cmd_next: tuple[ShadeCmd, bytes]
|
||||
self._cipher: Final[Cipher | None] = (
|
||||
Cipher(algorithms.AES(home_key), modes.CTR(bytes(16)))
|
||||
if len(home_key) == 16
|
||||
else None
|
||||
)
|
||||
@@ -106,15 +117,24 @@ class PowerViewBLE:
|
||||
await self._data_event.wait()
|
||||
self._data_event.clear()
|
||||
|
||||
def set_ble_device(self, ble_device: BLEDevice) -> None:
|
||||
"""Update the BLE device reference (e.g. when proxy details change)."""
|
||||
self._ble_device = ble_device
|
||||
|
||||
@property
|
||||
def encrypted(self) -> bool:
|
||||
"""Return whether communication with this shade is encrypted."""
|
||||
return self._is_encrypted
|
||||
|
||||
@encrypted.setter
|
||||
def encrypted(self, value:bool) -> None:
|
||||
def encrypted(self, value: bool) -> None:
|
||||
self._is_encrypted = value
|
||||
|
||||
@property
|
||||
def has_key(self) -> bool:
|
||||
"""Return True if a valid homekey was provided."""
|
||||
return self._cipher is not None
|
||||
|
||||
@property
|
||||
def info(self) -> PVDeviceInfo:
|
||||
"""Return device information, e.g. SW version."""
|
||||
@@ -126,9 +146,7 @@ class PowerViewBLE:
|
||||
return self._client.is_connected
|
||||
|
||||
# general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len
|
||||
async def _cmd(
|
||||
self, cmd: tuple[ShadeCmd, bytearray], disconnect: bool = True
|
||||
) -> None:
|
||||
async def _cmd(self, cmd: tuple[ShadeCmd, bytes], disconnect: bool = True) -> None:
|
||||
self._cmd_next = cmd
|
||||
if self._cmd_lock.locked():
|
||||
LOGGER.debug("%s: device busy, queuing %s command", self.name, cmd[0])
|
||||
@@ -137,19 +155,18 @@ class PowerViewBLE:
|
||||
async with self._cmd_lock:
|
||||
try:
|
||||
await self._connect()
|
||||
cmd_run = self._cmd_next
|
||||
tx_data = (
|
||||
bytearray(
|
||||
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
|
||||
+ bytes([self._seqcnt, len(cmd_run[1])])
|
||||
)
|
||||
cmd_run: tuple[ShadeCmd, bytes] = self._cmd_next
|
||||
tx_data: bytes = bytes(
|
||||
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
|
||||
+ bytes([self._seqcnt, len(cmd_run[1])])
|
||||
+ cmd_run[1]
|
||||
)
|
||||
LOGGER.debug("sending cmd: %s", tx_data.hex(" "))
|
||||
if self._cipher is not None and self._is_encrypted:
|
||||
enc = self._cipher.encryptor()
|
||||
enc: AEADEncryptionContext = self._cipher.encryptor()
|
||||
tx_data = enc.update(tx_data) + enc.finalize()
|
||||
LOGGER.debug(" encrypted: %s", tx_data.hex(" "))
|
||||
self._data_event.clear()
|
||||
LOGGER.debug("sending cmd: %s", tx_data)
|
||||
await self._client.write_gatt_char(UUID_TX, tx_data, False)
|
||||
self._seqcnt += 1
|
||||
LOGGER.debug("waiting for response")
|
||||
@@ -171,15 +188,15 @@ class PowerViewBLE:
|
||||
if len(data) != 9:
|
||||
LOGGER.debug("not a V2 record!")
|
||||
return []
|
||||
pos = int.from_bytes(data[3:5], byteorder="little")
|
||||
pos2 = (int(data[5]) << 4) + (int(data[4]) >> 4)
|
||||
pos: Final[int] = int.from_bytes(data[3:5], byteorder="little")
|
||||
pos2: Final[int] = (int(data[5]) << 4) + (int(data[4]) >> 4)
|
||||
return [
|
||||
(ATTR_CURRENT_POSITION, ((pos >> 2) / 10)),
|
||||
("position2", pos2 >> 2),
|
||||
("position3", int(data[6])),
|
||||
("tilt", int(data[7])),
|
||||
(ATTR_CURRENT_TILT_POSITION, int(data[7])),
|
||||
("home_id", int.from_bytes(data[0:2], byteorder="little")),
|
||||
("type_id", int.from_bytes(data[2:3])),
|
||||
("type_id", int(data[2])),
|
||||
("is_opening", bool(pos & 0x3 == 0x2)),
|
||||
("is_closing", bool(pos & 0x3 == 0x1)),
|
||||
("battery_charging", bool(pos & 0x3 == 0x3)), # observed
|
||||
@@ -189,16 +206,33 @@ class PowerViewBLE:
|
||||
]
|
||||
|
||||
# position cmd: uint16_t pos1, uint16_t pos2, uint16_t pos3, uint16_t tilt, uint8_t velocity
|
||||
async def set_position(self, value: int, disconnect: bool = True) -> None:
|
||||
async def set_position(
|
||||
self,
|
||||
pos1: int,
|
||||
pos2: int = 0x8000,
|
||||
pos3: int = 0x8000,
|
||||
tilt: int = 0x8000,
|
||||
velocity: int = 0x0,
|
||||
disconnect: bool = True,
|
||||
) -> None:
|
||||
"""Set position of device."""
|
||||
LOGGER.debug("%s setting position to %i", self.name, value)
|
||||
LOGGER.debug(
|
||||
"%s setting position to %i/%i/%i, tilt %i, velocity %s",
|
||||
self.name,
|
||||
pos1,
|
||||
pos2,
|
||||
pos3,
|
||||
tilt,
|
||||
velocity,
|
||||
)
|
||||
await self._cmd(
|
||||
(
|
||||
ShadeCmd.SET_POSITION,
|
||||
bytearray(
|
||||
int.to_bytes(value * 100, 2, byteorder="little")
|
||||
+ bytes([0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x0])
|
||||
),
|
||||
int.to_bytes(pos1 * 100, 2, byteorder="little")
|
||||
+ int.to_bytes(pos2, 2, byteorder="little")
|
||||
+ int.to_bytes(pos3, 2, byteorder="little")
|
||||
+ int.to_bytes(tilt, 2, byteorder="little")
|
||||
+ int.to_bytes(velocity, 1),
|
||||
),
|
||||
disconnect,
|
||||
)
|
||||
@@ -211,7 +245,7 @@ class PowerViewBLE:
|
||||
async def stop(self) -> None:
|
||||
"""Stop device movement."""
|
||||
LOGGER.debug("%s stop", self.name)
|
||||
await self._cmd((ShadeCmd.STOP, bytearray(b"")))
|
||||
await self._cmd((ShadeCmd.STOP, b""))
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Fully close cover."""
|
||||
@@ -227,18 +261,19 @@ class PowerViewBLE:
|
||||
await self._cmd(
|
||||
(
|
||||
ShadeCmd.ACTIVATE_SCENE,
|
||||
bytearray(int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2])),
|
||||
int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2]),
|
||||
),
|
||||
)
|
||||
|
||||
def _verify_response(self, din: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
|
||||
async def identify(self, beeps: int = 0x3) -> None:
|
||||
"""Identify device."""
|
||||
LOGGER.debug("%s identify (%i)", self.name, beeps)
|
||||
await self._cmd((ShadeCmd.IDENTIFY, bytes([min(beeps, 0xFF)])))
|
||||
|
||||
def _verify_response(self, data: bytes, seq_nr: int, cmd: ShadeCmd) -> bool:
|
||||
"""Verify shade response data."""
|
||||
data: bytearray = din
|
||||
if self._cipher is not None and self._is_encrypted:
|
||||
dec = self._cipher.decryptor()
|
||||
data = bytearray(dec.update(din) + dec.finalize())
|
||||
if len(data) < 4:
|
||||
LOGGER.error("Reponse message too short")
|
||||
LOGGER.error("Response message too short")
|
||||
return False
|
||||
if int.from_bytes(data[0:2], byteorder="little") != cmd.value & 0xFFEF:
|
||||
LOGGER.warning("Response to wrong command")
|
||||
@@ -252,7 +287,7 @@ class PowerViewBLE:
|
||||
LOGGER.error("Wrong response data length")
|
||||
return False
|
||||
if int(data[4] != 0):
|
||||
LOGGER.error("Command %d returned error #%d", cmd.value, int(data[4]))
|
||||
LOGGER.error("Command %X returned error #%d", cmd.value, int(data[4]))
|
||||
return False
|
||||
return True
|
||||
|
||||
@@ -279,6 +314,9 @@ class PowerViewBLE:
|
||||
.copy()
|
||||
.decode("UTF-8")
|
||||
)
|
||||
except BleakError as ex:
|
||||
LOGGER.debug("%s: querying failed: %s", self.name, ex)
|
||||
raise
|
||||
finally:
|
||||
await self.disconnect()
|
||||
LOGGER.debug("%s device data: %s", self.name, data)
|
||||
@@ -290,8 +328,17 @@ class PowerViewBLE:
|
||||
LOGGER.debug("Disconnected from %s", client.address)
|
||||
|
||||
def _notification_handler(self, _sender, data: bytearray) -> None:
|
||||
LOGGER.debug("%s received BLE data: %s", self.name, data)
|
||||
self._data = data
|
||||
LOGGER.debug("%s received BLE data: %s", self.name, data.hex(" "))
|
||||
self._data = bytes(data)
|
||||
if self._cipher is not None and self._is_encrypted:
|
||||
dec: AEADDecryptionContext = self._cipher.decryptor()
|
||||
self._data = bytes(dec.update(bytes(data)) + dec.finalize())
|
||||
LOGGER.debug(
|
||||
"%s %s",
|
||||
"decoded data: ".rjust(19 + len(self.name)),
|
||||
self._data.hex(" "),
|
||||
)
|
||||
|
||||
self._data_event.set()
|
||||
|
||||
async def _connect(self) -> None:
|
||||
@@ -303,15 +350,16 @@ class PowerViewBLE:
|
||||
LOGGER.debug("%s already connected", self.name)
|
||||
return
|
||||
|
||||
start = time.time()
|
||||
start: float = time.time()
|
||||
self._client = await establish_connection(
|
||||
BleakClient,
|
||||
self._ble_device,
|
||||
self.name,
|
||||
disconnected_callback=self._on_disconnect,
|
||||
ble_device_callback=lambda: self._ble_device,
|
||||
services=[
|
||||
UUID_COV_SERVICE,
|
||||
# self.UUID_DEV_SERVICE,
|
||||
UUID_DEV_SERVICE,
|
||||
# self.UUID_BAT_SERVICE,
|
||||
],
|
||||
)
|
||||
|
||||
@@ -40,7 +40,9 @@ async def async_setup_entry(
|
||||
)
|
||||
|
||||
|
||||
class PVBinarySensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySensorEntity): # type: ignore[reportIncompatibleMethodOverride]
|
||||
class PVBinarySensor(
|
||||
PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySensorEntity
|
||||
): # type: ignore[reportIncompatibleMethodOverride]
|
||||
"""The generic PV binary sensor implementation."""
|
||||
|
||||
def __init__(
|
||||
@@ -49,7 +51,7 @@ class PVBinarySensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySen
|
||||
descr: BinarySensorEntityDescription,
|
||||
unique_id: str,
|
||||
) -> None:
|
||||
"""Intialize PV binary sensor."""
|
||||
"""Initialize PV binary sensor."""
|
||||
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
|
||||
self._attr_device_info = coord.device_info
|
||||
self._attr_has_entity_name = True
|
||||
|
||||
71
custom_components/hunterdouglas_powerview_ble/button.py
Normal file
71
custom_components/hunterdouglas_powerview_ble/button.py
Normal file
@@ -0,0 +1,71 @@
|
||||
"""Hunter Douglas Powerview cover."""
|
||||
|
||||
from typing import Final
|
||||
|
||||
from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
PassiveBluetoothCoordinatorEntity,
|
||||
)
|
||||
from homeassistant.components.button import (
|
||||
ButtonDeviceClass,
|
||||
ButtonEntity,
|
||||
ButtonEntityDescription,
|
||||
)
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import EntityCategory
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.device_registry import DeviceInfo, format_mac
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .coordinator import PVCoordinator
|
||||
|
||||
BUTTONS_SHADE: Final = [
|
||||
ButtonEntityDescription(
|
||||
key="identify",
|
||||
device_class=ButtonDeviceClass.IDENTIFY,
|
||||
entity_category=EntityCategory.DIAGNOSTIC,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
_hass: HomeAssistant,
|
||||
config_entry: ConfigEntry,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up the demo cover platform."""
|
||||
|
||||
coordinator: PVCoordinator = config_entry.runtime_data
|
||||
for descr in BUTTONS_SHADE:
|
||||
async_add_entities([PowerViewButton(coordinator, descr)])
|
||||
|
||||
|
||||
class PowerViewButton(PassiveBluetoothCoordinatorEntity[PVCoordinator], ButtonEntity): # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Representation of a powerview shade."""
|
||||
|
||||
_attr_has_entity_name = True
|
||||
_attr_device_class = ButtonDeviceClass.IDENTIFY
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: PVCoordinator,
|
||||
description: ButtonEntityDescription,
|
||||
) -> None:
|
||||
"""Initialize the shade."""
|
||||
self.entity_description = description
|
||||
self._coord: PVCoordinator = coordinator
|
||||
self._attr_device_info = self._coord.device_info
|
||||
self._attr_unique_id = (
|
||||
f"{DOMAIN}_{format_mac(self._coord.address)}_{ButtonDeviceClass.IDENTIFY}"
|
||||
)
|
||||
super().__init__(coordinator)
|
||||
|
||||
@property
|
||||
def device_info(self) -> DeviceInfo: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return the device_info of the device."""
|
||||
return self._coord.device_info
|
||||
|
||||
async def async_press(self) -> None:
|
||||
"""Handle the button press."""
|
||||
LOGGER.debug("identify cover")
|
||||
await self._coord.api.identify()
|
||||
@@ -1,9 +1,15 @@
|
||||
"""Config flow for BLE Battery Management System integration."""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import contextlib
|
||||
from dataclasses import dataclass
|
||||
import struct
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components.bluetooth import (
|
||||
BluetoothServiceInfoBleak,
|
||||
@@ -11,19 +17,163 @@ from homeassistant.components.bluetooth import (
|
||||
)
|
||||
from homeassistant.config_entries import ConfigFlowResult
|
||||
from homeassistant.const import CONF_ADDRESS
|
||||
|
||||
# from homeassistant.helpers.device_registry import format_mac
|
||||
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.helpers.selector import (
|
||||
SelectOptionDict,
|
||||
SelectSelector,
|
||||
SelectSelectorConfig,
|
||||
TextSelector,
|
||||
TextSelectorConfig,
|
||||
TextSelectorType,
|
||||
)
|
||||
|
||||
from .api import UUID_COV_SERVICE as UUID
|
||||
from .const import DOMAIN, LOGGER, MFCT_ID
|
||||
from .const import CONF_HOME_KEY, DOMAIN, LOGGER, MFCT_ID
|
||||
|
||||
|
||||
def _needs_encryption(manufacturer_data_hex: str) -> bool:
|
||||
"""Return True if the BLE advertisement indicates encryption (home_id != 0)."""
|
||||
data = bytearray.fromhex(manufacturer_data_hex)
|
||||
if len(data) < 2:
|
||||
return False
|
||||
home_id = int.from_bytes(data[0:2], byteorder="little")
|
||||
return home_id != 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class HubShadeInfo:
|
||||
"""Shade metadata from the PowerView hub."""
|
||||
|
||||
name: str # Human-readable name (decoded from base64)
|
||||
ble_name: str # BLE advertisement name, e.g. "DUE:94ED"
|
||||
|
||||
|
||||
async def _fetch_shades_from_hub(
|
||||
hass: HomeAssistant, hub_url: str
|
||||
) -> list[HubShadeInfo]:
|
||||
"""Fetch shade list with human-readable names from a PowerView G3 hub.
|
||||
|
||||
Raises aiohttp.ClientError on network errors.
|
||||
Raises asyncio.TimeoutError on timeout.
|
||||
"""
|
||||
session = async_get_clientsession(hass)
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
|
||||
async with session.get(f"{hub_url}/home/shades", timeout=timeout) as resp:
|
||||
resp.raise_for_status()
|
||||
shades = await resp.json(content_type=None)
|
||||
|
||||
if not shades:
|
||||
return []
|
||||
|
||||
hub_shades: list[HubShadeInfo] = []
|
||||
for shade in shades:
|
||||
ble_name = shade.get("bleName", "")
|
||||
if not ble_name:
|
||||
continue
|
||||
name_b64 = shade.get("name", "")
|
||||
try:
|
||||
name = base64.b64decode(name_b64).decode("utf-8") if name_b64 else ble_name
|
||||
except Exception: # noqa: BLE001
|
||||
name = ble_name
|
||||
hub_shades.append(HubShadeInfo(name=name, ble_name=ble_name))
|
||||
return hub_shades
|
||||
|
||||
|
||||
async def _fetch_key_and_shades_from_hub(
|
||||
hass: HomeAssistant, hub_url: str
|
||||
) -> tuple[bytes, list[HubShadeInfo]]:
|
||||
"""Fetch 16-byte homekey and shade list from a PowerView G3 hub.
|
||||
|
||||
Returns (key, shade_list). The key is network-wide so any reachable shade
|
||||
returns the same value. The shade list contains human-readable names that
|
||||
can be used to label BLE-discovered devices.
|
||||
|
||||
Raises ValueError on protocol/key errors.
|
||||
Raises aiohttp.ClientError on network errors.
|
||||
Raises asyncio.TimeoutError on timeout.
|
||||
"""
|
||||
hub_shades = await _fetch_shades_from_hub(hass, hub_url)
|
||||
if not hub_shades:
|
||||
raise ValueError("No shades found on the hub")
|
||||
|
||||
session = async_get_clientsession(hass)
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
|
||||
# GetShadeKey BLE request: sid=251, cid=18, seqId=1, data_len=0
|
||||
request_frame = struct.pack("<BBBB", 251, 18, 1, 0)
|
||||
|
||||
# Try each shade until one returns a valid key (some may be out of range)
|
||||
last_error: Exception = ValueError("No shades responded")
|
||||
for hs in hub_shades:
|
||||
try:
|
||||
async with session.post(
|
||||
f"{hub_url}/home/shades/exec?shades={hs.ble_name}",
|
||||
json={"hex": request_frame.hex()},
|
||||
timeout=timeout,
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
result = await resp.json(content_type=None)
|
||||
except (TimeoutError, aiohttp.ClientError) as ex:
|
||||
last_error = ex
|
||||
continue
|
||||
|
||||
responses = result.get("responses", [])
|
||||
if len(responses) != 1 or "hex" not in responses[0]:
|
||||
continue
|
||||
|
||||
response_bytes = bytes.fromhex(responses[0]["hex"])
|
||||
if len(response_bytes) < 5:
|
||||
continue
|
||||
_s, _c, _q, length = struct.unpack("<BBBB", response_bytes[0:4])
|
||||
if len(response_bytes) != 4 + length:
|
||||
continue
|
||||
if response_bytes[4] != 0:
|
||||
continue
|
||||
key_data = response_bytes[5:]
|
||||
if len(key_data) != 16:
|
||||
continue
|
||||
return key_data, hub_shades
|
||||
|
||||
raise ValueError(f"No reachable shade returned a valid key: {last_error}")
|
||||
|
||||
|
||||
_HOMEKEY_SCHEMA = vol.Schema(
|
||||
{
|
||||
vol.Required("key_method", default="hub"): SelectSelector(
|
||||
SelectSelectorConfig(
|
||||
options=[
|
||||
SelectOptionDict(
|
||||
value="hub",
|
||||
label="Fetch automatically from PowerView hub",
|
||||
),
|
||||
SelectOptionDict(
|
||||
value="manual",
|
||||
label="Enter key manually (32 hex characters)",
|
||||
),
|
||||
SelectOptionDict(
|
||||
value="skip",
|
||||
label="Skip (no key — controls disabled for encrypted shades)",
|
||||
),
|
||||
]
|
||||
)
|
||||
),
|
||||
vol.Optional("hub_url", default="http://powerview-g3.local"): TextSelector(
|
||||
TextSelectorConfig(type=TextSelectorType.URL)
|
||||
),
|
||||
vol.Optional("home_key", default=""): TextSelector(
|
||||
TextSelectorConfig(type=TextSelectorType.TEXT)
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a config flow for BT Battery Management System."""
|
||||
|
||||
VERSION = 1
|
||||
MINOR_VERSION = 0
|
||||
MINOR_VERSION = 1
|
||||
|
||||
@dataclass
|
||||
class DiscoveredDevice:
|
||||
@@ -37,6 +187,83 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
|
||||
self._discovered_device: ConfigFlow.DiscoveredDevice | None = None
|
||||
self._discovered_devices: dict[str, ConfigFlow.DiscoveredDevice] = {}
|
||||
self._manufacturer_data_hex: str = ""
|
||||
self._device_name: str = ""
|
||||
self._home_key: str = ""
|
||||
self._hub_url: str = ""
|
||||
self._hub_shades: list[HubShadeInfo] = []
|
||||
|
||||
def _create_entry(self) -> ConfigFlowResult:
|
||||
"""Create the config entry with collected data."""
|
||||
data: dict[str, str] = {
|
||||
"manufacturer_data": self._manufacturer_data_hex,
|
||||
CONF_HOME_KEY: self._home_key,
|
||||
}
|
||||
if self._hub_url:
|
||||
data["hub_url"] = self._hub_url
|
||||
return self.async_create_entry(
|
||||
title=self._device_name,
|
||||
data=data,
|
||||
)
|
||||
|
||||
def _validate_manual_key(
|
||||
self, user_input: dict[str, Any], errors: dict[str, str]
|
||||
) -> bool:
|
||||
"""Validate a manually entered hex key and store it.
|
||||
|
||||
Returns True on success, False on validation error.
|
||||
"""
|
||||
raw = user_input.get("home_key", "").strip()
|
||||
if "\\x" in raw:
|
||||
raw = raw.replace("\\x", "")
|
||||
if len(raw) != 32:
|
||||
errors["home_key"] = "invalid_key_length"
|
||||
return False
|
||||
try:
|
||||
bytes.fromhex(raw)
|
||||
except ValueError:
|
||||
errors["home_key"] = "invalid_key_format"
|
||||
return False
|
||||
self._home_key = raw.lower()
|
||||
return True
|
||||
|
||||
async def _validate_homekey_input(
|
||||
self, user_input: dict[str, Any], errors: dict[str, str]
|
||||
) -> bool:
|
||||
"""Parse and validate homekey user_input, populating self state.
|
||||
|
||||
Returns True on success, False on validation error (errors dict is populated).
|
||||
On skip, self._home_key is set to "".
|
||||
"""
|
||||
method = user_input.get("key_method", "skip")
|
||||
|
||||
if method == "skip":
|
||||
self._home_key = ""
|
||||
return True
|
||||
|
||||
if method == "manual":
|
||||
return self._validate_manual_key(user_input, errors)
|
||||
|
||||
if method != "hub":
|
||||
return False
|
||||
|
||||
hub_url = user_input.get("hub_url", "").rstrip("/")
|
||||
_HUB_ERROR_MAP: dict[type[Exception], str] = {
|
||||
aiohttp.ClientResponseError: "hub_http_error",
|
||||
aiohttp.ClientConnectionError: "hub_connection_error",
|
||||
TimeoutError: "hub_timeout",
|
||||
ValueError: "hub_protocol_error",
|
||||
}
|
||||
try:
|
||||
key, hub_shades = await _fetch_key_and_shades_from_hub(self.hass, hub_url)
|
||||
except tuple(_HUB_ERROR_MAP) as ex:
|
||||
errors["hub_url"] = _HUB_ERROR_MAP[type(ex)]
|
||||
return False
|
||||
|
||||
self._home_key = key.hex()
|
||||
self._hub_url = hub_url
|
||||
self._hub_shades = hub_shades
|
||||
return True
|
||||
|
||||
async def async_step_bluetooth(
|
||||
self, discovery_info: BluetoothServiceInfoBleak
|
||||
@@ -61,10 +288,17 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
LOGGER.debug("confirm step for %s", self._discovered_device.name)
|
||||
|
||||
if user_input is not None:
|
||||
return self.async_create_entry(
|
||||
title=self._discovered_device.name,
|
||||
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
|
||||
self._manufacturer_data_hex = (
|
||||
self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()
|
||||
)
|
||||
self._device_name = self._discovered_device.name
|
||||
|
||||
# Unencrypted shades can skip the homekey step entirely
|
||||
if not _needs_encryption(self._manufacturer_data_hex):
|
||||
await self._resolve_friendly_name()
|
||||
return self._create_entry()
|
||||
|
||||
return await self.async_step_homekey_bluetooth()
|
||||
|
||||
self._set_confirm_only()
|
||||
|
||||
@@ -73,25 +307,153 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
description_placeholders={"name": self._discovered_device.name},
|
||||
)
|
||||
|
||||
async def async_step_homekey_bluetooth(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Configure homekey for a shade discovered via Bluetooth."""
|
||||
# Reuse an existing key if another shade was already configured
|
||||
existing = self._existing_home_key()
|
||||
if existing and user_input is None:
|
||||
self._home_key = existing
|
||||
await self._resolve_friendly_name()
|
||||
return self._create_entry()
|
||||
|
||||
errors: dict[str, str] = {}
|
||||
|
||||
if user_input is not None and await self._validate_homekey_input(
|
||||
user_input, errors
|
||||
):
|
||||
# Use hub name for the entry title if available
|
||||
friendly = self._hub_name_for(self._device_name)
|
||||
if friendly:
|
||||
self._device_name = friendly
|
||||
return self._create_entry()
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="homekey_bluetooth",
|
||||
data_schema=_HOMEKEY_SCHEMA,
|
||||
errors=errors,
|
||||
description_placeholders={
|
||||
"name": self._device_name,
|
||||
"hub_url_example": "http://powerview-g3.local",
|
||||
},
|
||||
)
|
||||
|
||||
def _existing_entry_value(self, key: str) -> str:
|
||||
"""Return the first non-empty value for *key* across configured entries."""
|
||||
for entry in self._async_current_entries():
|
||||
if value := entry.data.get(key, ""):
|
||||
return value
|
||||
return ""
|
||||
|
||||
def _existing_home_key(self) -> str:
|
||||
"""Return the home_key from any already-configured entry, or ''."""
|
||||
return self._existing_entry_value(CONF_HOME_KEY)
|
||||
|
||||
async def _resolve_friendly_name(self) -> None:
|
||||
"""Try to resolve BLE device name to hub friendly name."""
|
||||
hub_url = self._hub_url or self._existing_entry_value("hub_url")
|
||||
if not hub_url:
|
||||
return
|
||||
try:
|
||||
shades = await _fetch_shades_from_hub(self.hass, hub_url)
|
||||
for hs in shades:
|
||||
if hs.ble_name == self._device_name:
|
||||
self._device_name = hs.name
|
||||
break
|
||||
if not self._hub_url:
|
||||
self._hub_url = hub_url
|
||||
except (TimeoutError, aiohttp.ClientError, ValueError):
|
||||
pass
|
||||
|
||||
def _hub_name_for(self, ble_name: str) -> str | None:
|
||||
"""Return the human-readable hub name for a BLE name, or None."""
|
||||
for hs in self._hub_shades:
|
||||
if hs.ble_name == ble_name:
|
||||
return hs.name
|
||||
return None
|
||||
|
||||
async def async_step_user(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle the user step to pick discovered device."""
|
||||
"""Handle the user step — reuse existing key or offer a menu."""
|
||||
LOGGER.debug("user step")
|
||||
existing = self._existing_home_key()
|
||||
if existing:
|
||||
self._home_key = existing
|
||||
self._hub_url = self._hub_url or self._existing_entry_value("hub_url")
|
||||
if self._hub_url and not self._hub_shades:
|
||||
with contextlib.suppress(
|
||||
TimeoutError, aiohttp.ClientError, ValueError
|
||||
):
|
||||
self._hub_shades = await _fetch_shades_from_hub(
|
||||
self.hass, self._hub_url
|
||||
)
|
||||
return self.async_show_menu(
|
||||
step_id="user",
|
||||
menu_options=["select_device", "manual"],
|
||||
)
|
||||
return await self.async_step_homekey()
|
||||
|
||||
def _build_selected_entries(
|
||||
self, user_input: dict[str, Any]
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Build config entry data for each selected shade address."""
|
||||
addresses: list[str] = user_input[CONF_ADDRESS]
|
||||
if isinstance(addresses, str):
|
||||
addresses = [addresses]
|
||||
|
||||
entries: list[dict[str, Any]] = []
|
||||
for address in addresses:
|
||||
device = self._discovered_devices[address]
|
||||
ble_name = device.name
|
||||
name = self._hub_name_for(ble_name) or ble_name
|
||||
mfct_hex = device.discovery_info.manufacturer_data[MFCT_ID].hex()
|
||||
entry_data: dict[str, str] = {
|
||||
"manufacturer_data": mfct_hex,
|
||||
CONF_HOME_KEY: self._home_key,
|
||||
}
|
||||
if self._hub_url:
|
||||
entry_data["hub_url"] = self._hub_url
|
||||
entries.append(
|
||||
{
|
||||
"address": address,
|
||||
"name": name,
|
||||
"data": entry_data,
|
||||
}
|
||||
)
|
||||
return entries
|
||||
|
||||
async def async_step_select_device(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Select one or more BLE-discovered shades, or fall through to manual."""
|
||||
LOGGER.debug("select_device step")
|
||||
|
||||
if user_input is not None:
|
||||
address = user_input[CONF_ADDRESS]
|
||||
await self.async_set_unique_id(address, raise_on_progress=False)
|
||||
self._abort_if_unique_id_configured()
|
||||
self._discovered_device = self._discovered_devices[address]
|
||||
entries = self._build_selected_entries(user_input)
|
||||
|
||||
self.context["title_placeholders"] = {"name": self._discovered_device.name}
|
||||
|
||||
return self.async_create_entry(
|
||||
title=self._discovered_device.name,
|
||||
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
|
||||
# Kick off auto-add flows for all but the last shade
|
||||
await asyncio.gather(
|
||||
*(
|
||||
self.hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={"source": "auto_add"},
|
||||
data=info,
|
||||
)
|
||||
for info in entries[:-1]
|
||||
)
|
||||
)
|
||||
|
||||
# Create the final entry normally (ends this flow)
|
||||
last = entries[-1]
|
||||
await self.async_set_unique_id(last["address"], raise_on_progress=False)
|
||||
self._abort_if_unique_id_configured()
|
||||
self._device_name = last["name"]
|
||||
self._manufacturer_data_hex = last["data"]["manufacturer_data"]
|
||||
self.context["title_placeholders"] = {"name": self._device_name}
|
||||
return self._create_entry()
|
||||
|
||||
current_addresses = self._async_current_ids()
|
||||
for discovery_info in async_discovered_service_info(self.hass, False):
|
||||
address = discovery_info.address
|
||||
@@ -109,19 +471,96 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
)
|
||||
|
||||
if not self._discovered_devices:
|
||||
return self.async_abort(reason="no_devices_found")
|
||||
return await self.async_step_manual()
|
||||
|
||||
titles = []
|
||||
titles: list[SelectOptionDict] = []
|
||||
for address, discovery in self._discovered_devices.items():
|
||||
titles.append({"value": address, "label": discovery.name})
|
||||
hub_name = self._hub_name_for(discovery.name)
|
||||
label = f"{hub_name} ({discovery.name})" if hub_name else discovery.name
|
||||
titles.append({"value": address, "label": label})
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
step_id="select_device",
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_ADDRESS): SelectSelector(
|
||||
SelectSelectorConfig(options=titles)
|
||||
SelectSelectorConfig(options=titles, multiple=True)
|
||||
)
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
async def async_step_auto_add(
|
||||
self, discovery_info: dict[str, Any]
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle a shade queued from multi-select for individual setup."""
|
||||
await self.async_set_unique_id(discovery_info["address"])
|
||||
self._abort_if_unique_id_configured()
|
||||
|
||||
self._device_name = discovery_info["name"]
|
||||
self._manufacturer_data_hex = discovery_info["data"]["manufacturer_data"]
|
||||
self._home_key = discovery_info["data"].get(CONF_HOME_KEY, "")
|
||||
self._hub_url = discovery_info["data"].get("hub_url", "")
|
||||
|
||||
self.context["title_placeholders"] = {"name": self._device_name}
|
||||
return await self.async_step_auto_add_confirm()
|
||||
|
||||
async def async_step_auto_add_confirm(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Confirm adding a shade discovered via multi-select."""
|
||||
if user_input is not None:
|
||||
return self._create_entry()
|
||||
|
||||
self._set_confirm_only()
|
||||
return self.async_show_form(
|
||||
step_id="auto_add_confirm",
|
||||
description_placeholders={"name": self._device_name},
|
||||
)
|
||||
|
||||
async def async_step_manual(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle manual entry of a BLE device address and name."""
|
||||
if user_input is not None:
|
||||
address = user_input[CONF_ADDRESS].upper().strip()
|
||||
self._device_name = user_input["ble_name"].strip()
|
||||
await self.async_set_unique_id(address, raise_on_progress=False)
|
||||
self._abort_if_unique_id_configured()
|
||||
self.context["title_placeholders"] = {"name": self._device_name}
|
||||
self._manufacturer_data_hex = ""
|
||||
return self._create_entry()
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="manual",
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_ADDRESS): TextSelector(
|
||||
TextSelectorConfig(type=TextSelectorType.TEXT)
|
||||
),
|
||||
vol.Required("ble_name"): TextSelector(
|
||||
TextSelectorConfig(type=TextSelectorType.TEXT)
|
||||
),
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
async def async_step_homekey(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Configure homekey — collected before device selection."""
|
||||
errors: dict[str, str] = {}
|
||||
|
||||
if user_input is not None and await self._validate_homekey_input(
|
||||
user_input, errors
|
||||
):
|
||||
return await self.async_step_select_device()
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="homekey",
|
||||
data_schema=_HOMEKEY_SCHEMA,
|
||||
errors=errors,
|
||||
description_placeholders={
|
||||
"hub_url_example": "http://powerview-g3.local",
|
||||
},
|
||||
)
|
||||
|
||||
@@ -3,25 +3,12 @@
|
||||
import logging
|
||||
from typing import Final
|
||||
|
||||
# from bleak.uuids import normalize_uuid_str
|
||||
|
||||
# from homeassistant.const import ( # noqa: F401
|
||||
# ATTR_BATTERY_CHARGING,
|
||||
# ATTR_BATTERY_LEVEL,
|
||||
# ATTR_TEMPERATURE,
|
||||
# ATTR_VOLTAGE,
|
||||
# )
|
||||
|
||||
|
||||
DOMAIN: Final[str] = "hunterdouglas_powerview_ble"
|
||||
LOGGER: Final = logging.getLogger(__package__)
|
||||
MFCT_ID: Final[int] = 2073
|
||||
TIMEOUT: Final[int] = 5
|
||||
|
||||
# put the key here, needs to be 16 bytes long, e.g.
|
||||
# HOME_KEY: Final[bytes] = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
|
||||
HOME_KEY: Final[bytes] = b""
|
||||
|
||||
CONF_HOME_KEY: Final[str] = "home_key"
|
||||
|
||||
# attributes (do not change)
|
||||
ATTR_RSSI: Final[str] = "rssi"
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
from typing import Any
|
||||
|
||||
from bleak.backends.device import BLEDevice
|
||||
|
||||
from homeassistant.components import bluetooth
|
||||
from homeassistant.components.bluetooth.const import DOMAIN as BLUETOOTH_DOMAIN
|
||||
from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
@@ -12,26 +13,35 @@ from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.helpers.device_registry import CONNECTION_BLUETOOTH, DeviceInfo
|
||||
|
||||
from .api import SHADE_TYPE, PowerViewBLE
|
||||
from .const import ATTR_RSSI, DOMAIN, HOME_KEY, LOGGER
|
||||
from .const import ATTR_RSSI, CONF_HOME_KEY, DOMAIN, LOGGER
|
||||
|
||||
|
||||
class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
|
||||
"""Update coordinator for a battery management system."""
|
||||
|
||||
def __init__(
|
||||
self, hass: HomeAssistant, ble_device: BLEDevice, data: dict[str, Any]
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
ble_device: BLEDevice,
|
||||
data: dict[str, Any],
|
||||
friendly_name: str | None = None,
|
||||
) -> None:
|
||||
"""Initialize BMS data coordinator."""
|
||||
assert ble_device.name is not None
|
||||
self._mac = ble_device.address
|
||||
self.api = PowerViewBLE(ble_device, HOME_KEY)
|
||||
self._friendly_name = friendly_name or ble_device.name
|
||||
home_key_hex: str = data.get(CONF_HOME_KEY, "")
|
||||
home_key: bytes = (
|
||||
bytes.fromhex(home_key_hex) if len(home_key_hex) == 32 else b""
|
||||
)
|
||||
self.api = PowerViewBLE(ble_device, home_key)
|
||||
self.data: dict[str, int | float | bool] = {}
|
||||
self._manuf_dat = data.get("manufacturer_data")
|
||||
self.dev_details: dict[str, str] = {}
|
||||
|
||||
LOGGER.debug(
|
||||
"Initializing coordinator for %s (%s)",
|
||||
ble_device.name,
|
||||
self._friendly_name,
|
||||
ble_device.address,
|
||||
)
|
||||
super().__init__(
|
||||
@@ -49,16 +59,15 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
|
||||
@property
|
||||
def device_info(self) -> DeviceInfo:
|
||||
"""Return detailed device information for GUI."""
|
||||
LOGGER.debug("%s: device_info, %s", self.name, self.dev_details)
|
||||
LOGGER.debug("%s: device_info, %s", self._friendly_name, self.dev_details)
|
||||
return DeviceInfo(
|
||||
identifiers={
|
||||
(DOMAIN, self.name),
|
||||
(DOMAIN, self.address),
|
||||
(BLUETOOTH_DOMAIN, self.address),
|
||||
},
|
||||
connections={(CONNECTION_BLUETOOTH, self.address)},
|
||||
name=self.name,
|
||||
name=self._friendly_name,
|
||||
configuration_url=None,
|
||||
# properties used in GUI:
|
||||
manufacturer="Hunter Douglas",
|
||||
model=(
|
||||
str(SHADE_TYPE.get(int(bytes.fromhex(self._manuf_dat)[2]), "unknown"))
|
||||
@@ -80,7 +89,7 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
|
||||
|
||||
def _async_stop(self) -> None:
|
||||
"""Shutdown coordinator and any connection."""
|
||||
LOGGER.debug("%s: shuting down BMS device", self.name)
|
||||
LOGGER.debug("%s: shutting down BMS device", self.name)
|
||||
self.hass.async_create_task(self.api.disconnect())
|
||||
super()._async_stop()
|
||||
|
||||
@@ -92,10 +101,8 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
|
||||
) -> None:
|
||||
"""Handle a Bluetooth event."""
|
||||
|
||||
# if not self.dev_details:
|
||||
# self.hass.async_create_task(self._get_device_info())
|
||||
|
||||
LOGGER.debug("BLE event %s: %s", change, service_info.manufacturer_data)
|
||||
self.api.set_ble_device(service_info.device)
|
||||
self.data = {ATTR_RSSI: service_info.rssi}
|
||||
if change == bluetooth.BluetoothChange.ADVERTISEMENT:
|
||||
self.data.update(
|
||||
|
||||
@@ -3,12 +3,15 @@
|
||||
from typing import Any, Final
|
||||
|
||||
from bleak.exc import BleakError
|
||||
|
||||
from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
PassiveBluetoothCoordinatorEntity,
|
||||
)
|
||||
from homeassistant.components.cover import (
|
||||
ATTR_CURRENT_POSITION,
|
||||
ATTR_CURRENT_TILT_POSITION,
|
||||
ATTR_POSITION,
|
||||
ATTR_TILT_POSITION,
|
||||
CoverDeviceClass,
|
||||
CoverEntity,
|
||||
CoverEntityFeature,
|
||||
@@ -19,7 +22,7 @@ from homeassistant.helpers.device_registry import DeviceInfo, format_mac
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
|
||||
from .api import CLOSED_POSITION, OPEN_POSITION
|
||||
from .const import DOMAIN, HOME_KEY, LOGGER
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .coordinator import PVCoordinator
|
||||
|
||||
|
||||
@@ -31,11 +34,18 @@ async def async_setup_entry(
|
||||
"""Set up the demo cover platform."""
|
||||
|
||||
coordinator: PVCoordinator = config_entry.runtime_data
|
||||
async_add_entities([PowerViewCover(coordinator)])
|
||||
model: Final[str | None] = coordinator.dev_details.get("model")
|
||||
entities: list[PowerViewCover] = []
|
||||
if model == "39":
|
||||
entities.append(PowerViewCoverTiltOnly(coordinator))
|
||||
else:
|
||||
entities.append(PowerViewCover(coordinator))
|
||||
|
||||
async_add_entities(entities)
|
||||
|
||||
|
||||
class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEntity): # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Representation of a powerview shade."""
|
||||
"""Representation of a PowerView shade with Up/Down functionality only."""
|
||||
|
||||
_attr_has_entity_name = True
|
||||
_attr_device_class = CoverDeviceClass.SHADE
|
||||
@@ -51,8 +61,9 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
coordinator: PVCoordinator,
|
||||
) -> None:
|
||||
"""Initialize the shade."""
|
||||
self._attr_name = CoverDeviceClass.SHADE
|
||||
self._coord = coordinator
|
||||
LOGGER.debug("%s: init() PowerViewCover", coordinator.name)
|
||||
self._attr_name = None
|
||||
self._coord: PVCoordinator = coordinator
|
||||
self._attr_device_info = self._coord.device_info
|
||||
self._target_position: int | None = round(
|
||||
self._coord.data.get(ATTR_CURRENT_POSITION, OPEN_POSITION)
|
||||
@@ -96,7 +107,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
def supported_features(self) -> CoverEntityFeature: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Flag supported features, disable control if encryption is needed."""
|
||||
if (
|
||||
self._coord.data.get("home_id") and len(HOME_KEY) != 16
|
||||
self._coord.data.get("home_id") and not self._coord.api.has_key
|
||||
) or self._coord.data.get("battery_charging"):
|
||||
return CoverEntityFeature(0)
|
||||
|
||||
@@ -170,3 +181,117 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error("Failed to stop cover '%s': %s", self.name, err)
|
||||
|
||||
|
||||
class PowerViewCoverTilt(PowerViewCover):
|
||||
"""Representation of a PowerView shade with additional tilt functionality."""
|
||||
|
||||
_attr_supported_features = (
|
||||
CoverEntityFeature.OPEN
|
||||
| CoverEntityFeature.CLOSE
|
||||
| CoverEntityFeature.STOP
|
||||
| CoverEntityFeature.SET_POSITION
|
||||
| CoverEntityFeature.OPEN_TILT
|
||||
| CoverEntityFeature.CLOSE_TILT
|
||||
| CoverEntityFeature.STOP_TILT
|
||||
| CoverEntityFeature.SET_TILT_POSITION
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: PVCoordinator,
|
||||
) -> None:
|
||||
"""Initialize the shade with tilt."""
|
||||
LOGGER.debug("%s: init() PowerViewCoverTilt", coordinator.name)
|
||||
super().__init__(coordinator)
|
||||
|
||||
@property
|
||||
def current_cover_tilt_position(self) -> int | None: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return current tilt of cover.
|
||||
|
||||
None is unknown
|
||||
"""
|
||||
pos: Final = self._coord.data.get(ATTR_CURRENT_TILT_POSITION)
|
||||
return round(pos) if pos is not None else None
|
||||
|
||||
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
|
||||
"""Move the tilt to a specific position."""
|
||||
|
||||
if isinstance(target_position := kwargs.get(ATTR_TILT_POSITION), int):
|
||||
LOGGER.debug("set cover tilt to position %i", target_position)
|
||||
if (
|
||||
self.current_cover_tilt_position == round(target_position)
|
||||
or self.current_cover_position is None
|
||||
):
|
||||
return
|
||||
|
||||
try:
|
||||
await self._coord.api.set_position(
|
||||
self.current_cover_position, tilt=target_position
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error(
|
||||
"Failed to tilt cover '%s' to %f%%: %s",
|
||||
self.name,
|
||||
target_position,
|
||||
err,
|
||||
)
|
||||
|
||||
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
|
||||
"""Stop the cover."""
|
||||
await self.async_stop_cover(kwargs=kwargs)
|
||||
|
||||
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
|
||||
"""Open the cover tilt."""
|
||||
LOGGER.debug("open cover tilt")
|
||||
_kwargs = {**kwargs, ATTR_TILT_POSITION: OPEN_POSITION}
|
||||
await self.async_set_cover_tilt_position(**_kwargs)
|
||||
|
||||
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
|
||||
"""Close the cover tilt."""
|
||||
LOGGER.debug("close cover tilt")
|
||||
_kwargs = {**kwargs, ATTR_TILT_POSITION: CLOSED_POSITION}
|
||||
await self.async_set_cover_tilt_position(**_kwargs)
|
||||
|
||||
|
||||
class PowerViewCoverTiltOnly(PowerViewCoverTilt):
|
||||
"""Representation of a PowerView shade with additional tilt functionality."""
|
||||
|
||||
OPENCLOSED_THRESHOLD = 5
|
||||
|
||||
_attr_device_class = CoverDeviceClass.BLIND
|
||||
_attr_supported_features = (
|
||||
CoverEntityFeature.OPEN_TILT
|
||||
| CoverEntityFeature.CLOSE_TILT
|
||||
| CoverEntityFeature.STOP_TILT
|
||||
| CoverEntityFeature.SET_TILT_POSITION
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: PVCoordinator,
|
||||
) -> None:
|
||||
"""Initialize the shade with tilt only."""
|
||||
LOGGER.debug("%s: init() PowerViewCoverTiltOnly", coordinator.name)
|
||||
super().__init__(coordinator)
|
||||
|
||||
@property
|
||||
def is_opening(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return if the cover is opening or not."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def is_closing(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return if the cover is closing or not."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def is_closed(self) -> bool: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return if the cover is closed."""
|
||||
return isinstance(self.current_cover_tilt_position, int) and (
|
||||
self.current_cover_tilt_position
|
||||
>= OPEN_POSITION - PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
|
||||
or self.current_cover_tilt_position
|
||||
<= CLOSED_POSITION + PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
|
||||
)
|
||||
|
||||
@@ -4,8 +4,7 @@
|
||||
"bluetooth": [
|
||||
{
|
||||
"service_uuid": "0000fdc1-0000-1000-8000-00805f9b34fb",
|
||||
"manufacturer_id": 2073,
|
||||
"manufacturer_data_start": [0,0]
|
||||
"manufacturer_id": 2073
|
||||
}
|
||||
],
|
||||
"codeowners": ["@patman15"],
|
||||
@@ -17,5 +16,5 @@
|
||||
"issue_tracker": "https://github.com/patman15/hdpv_ble/issues",
|
||||
"loggers": ["hunterdouglas_powerview_ble"],
|
||||
"requirements": ["cryptography>=43.0.0"],
|
||||
"version": "0.22"
|
||||
"version": "0.24"
|
||||
}
|
||||
|
||||
@@ -3,14 +3,8 @@
|
||||
from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
PassiveBluetoothCoordinatorEntity,
|
||||
)
|
||||
from homeassistant.components.sensor import (
|
||||
SensorEntity,
|
||||
SensorEntityDescription,
|
||||
)
|
||||
from homeassistant.components.sensor.const import (
|
||||
SensorDeviceClass,
|
||||
SensorStateClass,
|
||||
)
|
||||
from homeassistant.components.sensor import SensorEntity, SensorEntityDescription
|
||||
from homeassistant.components.sensor.const import SensorDeviceClass, SensorStateClass
|
||||
from homeassistant.const import (
|
||||
ATTR_BATTERY_LEVEL,
|
||||
PERCENTAGE,
|
||||
@@ -67,7 +61,7 @@ class PVSensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], SensorEntity):
|
||||
def __init__(
|
||||
self, pv_dev: PVCoordinator, descr: SensorEntityDescription, unique_id: str
|
||||
) -> None:
|
||||
"""Intitialize the BMS sensor."""
|
||||
"""Initialize the BMS sensor."""
|
||||
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
|
||||
self._attr_device_info = pv_dev.device_info
|
||||
self.entity_description = descr
|
||||
|
||||
@@ -2,10 +2,73 @@
|
||||
"config": {
|
||||
"flow_title": "Setup {name}",
|
||||
"step": {
|
||||
"user": {
|
||||
"title": "Add PowerView Shade",
|
||||
"menu_options": {
|
||||
"select_device": "Select from discovered shades",
|
||||
"manual": "Enter device details manually"
|
||||
},
|
||||
"menu_option_descriptions": {
|
||||
"select_device": "Choose from shades detected via Bluetooth nearby.",
|
||||
"manual": "Enter a Bluetooth MAC address and device name directly, for example if a shade is out of range of discovery."
|
||||
}
|
||||
},
|
||||
"bluetooth_confirm": {
|
||||
"description": "[%key:component::bluetooth::config::step::bluetooth_confirm::description%]"
|
||||
},
|
||||
"homekey": {
|
||||
"title": "Configure HomeKey",
|
||||
"description": "All shades on a PowerView network share the same encryption key. The recommended method is to fetch it from your G3 hub. You can also enter the key manually, or skip if your shades are unencrypted.",
|
||||
"data": {
|
||||
"key_method": "Key source",
|
||||
"hub_url": "PowerView hub URL",
|
||||
"home_key": "HomeKey (32 hex characters or \\xNN format)"
|
||||
},
|
||||
"data_description": {
|
||||
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
|
||||
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
|
||||
}
|
||||
},
|
||||
"homekey_bluetooth": {
|
||||
"title": "Configure HomeKey for {name}",
|
||||
"description": "This shade uses encryption. The recommended method is to fetch the key from your G3 hub. You can also enter it manually, or skip (controls will be disabled until a key is configured).",
|
||||
"data": {
|
||||
"key_method": "Key source",
|
||||
"hub_url": "PowerView hub URL",
|
||||
"home_key": "HomeKey (32 hex characters or \\xNN format)"
|
||||
},
|
||||
"data_description": {
|
||||
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
|
||||
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
|
||||
}
|
||||
},
|
||||
"auto_add_confirm": {
|
||||
"description": "Do you want to set up {name}?"
|
||||
},
|
||||
"select_device": {
|
||||
"title": "Select Shades",
|
||||
"description": "Select the PowerView shades to add via Bluetooth.",
|
||||
"data": {
|
||||
"address": "Shades"
|
||||
}
|
||||
},
|
||||
"manual": {
|
||||
"title": "Enter Device Details",
|
||||
"description": "Enter the device details manually.",
|
||||
"data": {
|
||||
"address": "Bluetooth MAC address (e.g. AA:BB:CC:DD:EE:FF)",
|
||||
"ble_name": "BLE device name (e.g. DUE:94ED)"
|
||||
}
|
||||
}
|
||||
},
|
||||
"error": {
|
||||
"invalid_key_format": "HomeKey must be a valid hexadecimal string",
|
||||
"invalid_key_length": "HomeKey must be exactly 32 hex characters (16 bytes)",
|
||||
"hub_connection_error": "Cannot connect to the PowerView hub",
|
||||
"hub_http_error": "Hub returned an HTTP error",
|
||||
"hub_timeout": "Connection to hub timed out",
|
||||
"hub_protocol_error": "Hub returned an unexpected response"
|
||||
},
|
||||
"abort": {
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
|
||||
"no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]",
|
||||
|
||||
@@ -1,15 +1,75 @@
|
||||
{
|
||||
"config": {
|
||||
"flow_title": "Setup {name}",
|
||||
"step": {
|
||||
"user": {
|
||||
"title": "Add PowerView Shade",
|
||||
"menu_options": {
|
||||
"select_device": "Select from discovered shades",
|
||||
"manual": "Enter device details manually"
|
||||
},
|
||||
"menu_option_descriptions": {
|
||||
"select_device": "Choose from shades detected via Bluetooth nearby.",
|
||||
"manual": "Enter a Bluetooth MAC address and device name directly, for example if a shade is out of range of discovery."
|
||||
}
|
||||
},
|
||||
"bluetooth_confirm": {
|
||||
"description": "Do you want to set up {name}?"
|
||||
},
|
||||
"homekey": {
|
||||
"title": "Configure HomeKey",
|
||||
"description": "All shades on a PowerView network share the same encryption key. The recommended method is to fetch it from your G3 hub. You can also enter the key manually, or skip if your shades are unencrypted.",
|
||||
"data": {
|
||||
"key_method": "Key source",
|
||||
"hub_url": "PowerView hub URL",
|
||||
"home_key": "HomeKey (32 hex characters or \\xNN format)"
|
||||
},
|
||||
"data_description": {
|
||||
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
|
||||
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
|
||||
}
|
||||
},
|
||||
"homekey_bluetooth": {
|
||||
"title": "Configure HomeKey for {name}",
|
||||
"description": "This shade uses encryption. The recommended method is to fetch the key from your G3 hub. You can also enter it manually, or skip (controls will be disabled until a key is configured).",
|
||||
"data": {
|
||||
"key_method": "Key source",
|
||||
"hub_url": "PowerView hub URL",
|
||||
"home_key": "HomeKey (32 hex characters or \\xNN format)"
|
||||
},
|
||||
"data_description": {
|
||||
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
|
||||
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
|
||||
}
|
||||
},
|
||||
"select_device": {
|
||||
"title": "Select Shades",
|
||||
"description": "Select the PowerView shades to add via Bluetooth.",
|
||||
"data": {
|
||||
"address": "Shades"
|
||||
}
|
||||
},
|
||||
"manual": {
|
||||
"title": "Enter Device Details",
|
||||
"description": "Enter the device details manually.",
|
||||
"data": {
|
||||
"address": "Bluetooth MAC address (e.g. AA:BB:CC:DD:EE:FF)",
|
||||
"ble_name": "BLE device name (e.g. DUE:94ED)"
|
||||
}
|
||||
}
|
||||
},
|
||||
"error": {
|
||||
"invalid_key_format": "HomeKey must be a valid hexadecimal string",
|
||||
"invalid_key_length": "HomeKey must be exactly 32 hex characters (16 bytes)",
|
||||
"hub_connection_error": "Cannot connect to the PowerView hub",
|
||||
"hub_http_error": "Hub returned an HTTP error",
|
||||
"hub_timeout": "Connection to hub timed out",
|
||||
"hub_protocol_error": "Hub returned an unexpected response"
|
||||
},
|
||||
"abort": {
|
||||
"already_configured": "Device is already configured",
|
||||
"no_devices_found": "No devices found on the network",
|
||||
"not_supported": "Device not supported"
|
||||
},
|
||||
"flow_title": "Setup {name}",
|
||||
"step": {
|
||||
"bluetooth_confirm": {
|
||||
"description": "Do you want to set up {name}?"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,12 @@
|
||||
* Emulate a Hunter Douglas PowerView cover device using ESP32
|
||||
* used e.g. to gain the home_key from an existing installation via BLE
|
||||
*
|
||||
* REQUIRES:
|
||||
* - ESP32 Board Definitions 3.0.x (tested on 3.0.7)
|
||||
* - WolfSSL 5.7.x (tested on 5.7.6)
|
||||
* - Phone Region: Potentially an alternative region depending on the app response
|
||||
* - e.g. To add Parkland shades in the US, phone region set to the UK temporarily
|
||||
*
|
||||
* TODO:
|
||||
* - cleanup code
|
||||
* - think about emulating a remote
|
||||
@@ -11,9 +17,13 @@
|
||||
*/
|
||||
|
||||
#define NAME "myPVcover"
|
||||
#define FW_VERSION "391"
|
||||
#define SERIAL_NR "01234567890ABCDEF"
|
||||
|
||||
const uint16_t SW_VERSION = 391;
|
||||
const char *SERIAL_NR = "01234567890ABCDEF";
|
||||
const uint16_t TYP_ID = 42; // 62
|
||||
const uint16_t MODEL_ID = 224;
|
||||
const uint16_t FW_REVISION = 27;
|
||||
const uint32_t HW_REVISION = 171103;
|
||||
const uint8_t BATTERY_LEVEL = 42;
|
||||
|
||||
#include <BLEDevice.h>
|
||||
#include <BLEServer.h>
|
||||
@@ -40,7 +50,11 @@ int devId = INVALID_DEVID; //if not using async INVALID_DEVID is default
|
||||
|
||||
#define DEV_SERVICE_UUID BLEUUID("180A")
|
||||
#define SER_CHAR_UUID BLEUUID("2A25")
|
||||
#define SWC_CHAR_UUID BLEUUID("2A28")
|
||||
#define MAN_CHAR_UUID BLEUUID("2A29")
|
||||
#define MOD_CHAR_UUID BLEUUID("2A24")
|
||||
#define FWR_CHAR_UUID BLEUUID("2A26")
|
||||
#define HWR_CHAR_UUID BLEUUID("2A27")
|
||||
#define SWR_CHAR_UUID BLEUUID("2A28")
|
||||
|
||||
#define BAT_SERVICE_UUID BLEUUID("180F")
|
||||
#define BAT_CHAR_UUID BLEUUID("2A19")
|
||||
@@ -69,7 +83,7 @@ struct notification {
|
||||
};
|
||||
|
||||
BLECharacteristic *pCharacteristic_cover, *pCharacteristic_fw, *pCharacteristic_unknown, *pCharacteristic_bat;
|
||||
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser;
|
||||
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser, *pCharacteristic_man, *pCharacteristic_mod, *pCharacteristic_fwr, *pCharacteristic_hwr;
|
||||
BLEServer *pServer = NULL;
|
||||
bool deviceConnected = false;
|
||||
bool oldDeviceConnected = false;
|
||||
@@ -158,12 +172,12 @@ void decode(BLECharacteristic *pChar) {
|
||||
memcpy((void *)&msg, data_dec, 4);
|
||||
Serial.printf("\t message: SRV: %02x, CMD %02x, SEQ %i, LEN %i\n", msg.serviceID, msg.cmdID, msg.sequence, msg.data_len);
|
||||
|
||||
// sepecial responses (static data!)
|
||||
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
|
||||
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x2a, 0xe0, 0x08 }; // HW diagnostics
|
||||
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
|
||||
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
|
||||
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
|
||||
// special responses (static data!)
|
||||
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
|
||||
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, (uint8_t)(SW_VERSION & 0xFF), (uint8_t)(SW_VERSION >> 8), 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, TYP_ID, MODEL_ID, 0x08 }; // HW diagnostics
|
||||
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
|
||||
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
|
||||
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
|
||||
|
||||
Serial.print("\t\t");
|
||||
switch ((msg.serviceID << 8) | msg.cmdID) {
|
||||
@@ -179,7 +193,7 @@ void decode(BLECharacteristic *pChar) {
|
||||
break;
|
||||
case 0xF711:
|
||||
// identify
|
||||
Serial.printf("identify: %i\n", data_dec[4]);
|
||||
Serial.printf("identify: %i times\n", data_dec[4]);
|
||||
resp_size = set_response(&response, (const message *)data_dec);
|
||||
break;
|
||||
case 0xF7B8:
|
||||
@@ -208,8 +222,8 @@ void decode(BLECharacteristic *pChar) {
|
||||
case 0xFB02:
|
||||
// set shade key
|
||||
Serial.print("set shade key: ");
|
||||
print_hex(&data_raw[4], data_len - 4, "\\x");
|
||||
// set resonse before key, to acknowledge unencrypted
|
||||
print_hex(&data_raw[4], data_len - 4, "\\x", "");
|
||||
// set response before key, to acknowledge unencrypted
|
||||
resp_size = set_response(&response, (const message *)data_dec);
|
||||
if (msg.data_len == 16) {
|
||||
memcpy(home_key, &data_raw[4], 16);
|
||||
@@ -251,7 +265,9 @@ void decode(BLECharacteristic *pChar) {
|
||||
resp_size = set_response(&response, (const message *)data_dec);
|
||||
break;
|
||||
default:
|
||||
Serial.println(F("*********************************** unknown message"));
|
||||
Serial.println(F("*********************************** unknown message (try ACK)"));
|
||||
resp_size = set_response(&response, (const message *)data_dec);
|
||||
break;
|
||||
}
|
||||
if (resp_size) {
|
||||
pChar->setValue((uint8_t *)&response, resp_size);
|
||||
@@ -291,7 +307,7 @@ class coverCallbacks : public BLECharacteristicCallbacks {
|
||||
void onNotify(BLECharacteristic *pCharacteristic) {
|
||||
Serial.printf("Cover onNotify() %s\n", pCharacteristic->toString().c_str());
|
||||
}
|
||||
|
||||
|
||||
void onStatus(BLECharacteristic *pCharacteristic, Status s, uint32_t code) {
|
||||
Serial.printf("Cover onStatus() %s: %s\n", BLEstate[s], pCharacteristic->toString().c_str());
|
||||
}
|
||||
@@ -341,6 +357,7 @@ class genericCallbacks : public BLECharacteristicCallbacks {
|
||||
|
||||
void setup() {
|
||||
Serial.begin(115200);
|
||||
delay(1000); // wait for terminal to be ready
|
||||
Serial.println(NAME " initializing ...");
|
||||
|
||||
BLEDevice::init(NAME);
|
||||
@@ -373,8 +390,7 @@ pDesc1->setValue("cover");*/
|
||||
BAT_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_bat->setCallbacks(new batteryCallbacks());
|
||||
uint8_t battery_level = 42;
|
||||
pCharacteristic_bat->setValue(&battery_level, 1);
|
||||
pCharacteristic_bat->setValue((uint8_t *)&BATTERY_LEVEL, 1);
|
||||
pCharacteristic_bat->addDescriptor(new BLE2902());
|
||||
|
||||
BLEService *pFWService = pServer->createService(FW_SERVICE_UUID);
|
||||
@@ -390,9 +406,9 @@ pDesc1->setValue("cover");*/
|
||||
|
||||
BLEService *pDEVService = pServer->createService(DEV_SERVICE_UUID);
|
||||
pCharacteristic_dev = pDEVService->createCharacteristic(
|
||||
SWC_CHAR_UUID,
|
||||
SWR_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_dev->setValue(FW_VERSION);
|
||||
pCharacteristic_dev->setValue(String(SW_VERSION));
|
||||
pCharacteristic_dev->setCallbacks(new genericCallbacks());
|
||||
pCharacteristic_ser = pDEVService->createCharacteristic(
|
||||
SER_CHAR_UUID,
|
||||
@@ -400,6 +416,28 @@ pDesc1->setValue("cover");*/
|
||||
pCharacteristic_ser->setValue(SERIAL_NR);
|
||||
pCharacteristic_ser->setCallbacks(new genericCallbacks());
|
||||
|
||||
pCharacteristic_man = pDEVService->createCharacteristic(
|
||||
MAN_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_man->setValue("Hunter Douglas");
|
||||
pCharacteristic_man->setCallbacks(new genericCallbacks());
|
||||
pCharacteristic_mod = pDEVService->createCharacteristic(
|
||||
MOD_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_mod->setValue(String(TYP_ID));
|
||||
pCharacteristic_mod->setCallbacks(new genericCallbacks());
|
||||
pCharacteristic_fwr = pDEVService->createCharacteristic(
|
||||
FWR_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_fwr->setValue(String(FW_REVISION));
|
||||
pCharacteristic_fwr->setCallbacks(new genericCallbacks());
|
||||
pCharacteristic_hwr = pDEVService->createCharacteristic(
|
||||
HWR_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_hwr->setValue(String(HW_REVISION));
|
||||
pCharacteristic_hwr->setCallbacks(new genericCallbacks());
|
||||
|
||||
|
||||
// Start the services
|
||||
pCovService->start();
|
||||
pFWService->start();
|
||||
@@ -408,9 +446,10 @@ pDesc1->setValue("cover");*/
|
||||
|
||||
// Start advertising
|
||||
BLEAdvertisementData AdvertisementData;
|
||||
const String manufacturerData = String("\x19\x08\x00\x00\x2A\x00\x00\x00\x00\x00\xA2", 11);
|
||||
// Hunter Douglas ^^--^^ ^^key^^ ^^ ID-Type
|
||||
AdvertisementData.setManufacturerData(manufacturerData);
|
||||
const char adv[] = { 0x19, 0x08, 0x00, 0x00, TYP_ID, 0x00, 0x00, 0x00, 0x00, 0x00, 0xA2 };
|
||||
// Hunter Douglas ^^ -- ^^ ^^key ^^ ^--pos1--^
|
||||
|
||||
AdvertisementData.setManufacturerData(String(adv, 11));
|
||||
AdvertisementData.setPartialServices(BLEUUID(COVER_SERVICE_UUID));
|
||||
AdvertisementData.setFlags((1 << 2) | (1 << 1)); // [BR/EDR Not Supported] | [LE General Discoverable Mode]
|
||||
|
||||
|
||||
BIN
img/1200x630wa.png
Normal file
BIN
img/1200x630wa.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 40 KiB |
BIN
img/icon.png
Normal file
BIN
img/icon.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 11 KiB |
BIN
img/icon@2x.png
Normal file
BIN
img/icon@2x.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 29 KiB |
BIN
img/icon_full.png
Normal file
BIN
img/icon_full.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 31 KiB |
BIN
img/logo.png
Normal file
BIN
img/logo.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 44 KiB |
BIN
img/logo@2x.png
Normal file
BIN
img/logo@2x.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 53 KiB |
@@ -1,7 +1,17 @@
|
||||
# pyproject.toml
|
||||
|
||||
[project]
|
||||
requires-python = ">=3.12.0"
|
||||
name = "hunterdouglas_powerview_ble"
|
||||
classifiers = [
|
||||
"License :: OSI Approved :: Apache Software License",
|
||||
"Programming Language :: Python :: 3.13"
|
||||
]
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13.2"
|
||||
|
||||
[project.urls]
|
||||
"Source Code" = "https://github.com/patman15/hdpv_ble/"
|
||||
"Bug Reports" = "https://github.com/patman15/hdpv_ble/issues"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
minversion = "8.0"
|
||||
@@ -14,9 +24,9 @@ testpaths = [
|
||||
]
|
||||
asyncio_mode = "auto"
|
||||
|
||||
# ruff configuration taken from HA 2024.11.2 (less ignores)
|
||||
# ruff settings from HA 2025.2.2
|
||||
[tool.ruff]
|
||||
required-version = ">=0.6.8"
|
||||
required-version = ">=0.9.1"
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = [
|
||||
@@ -35,8 +45,10 @@ select = [
|
||||
"B017", # pytest.raises(BaseException) should be considered evil
|
||||
"B018", # Found useless attribute access. Either assign it to a variable or remove it.
|
||||
"B023", # Function definition does not bind loop variable {name}
|
||||
"B024", # `{name}` is an abstract base class, but it has no abstract methods or properties
|
||||
"B026", # Star-arg unpacking after a keyword argument is strongly discouraged
|
||||
"B032", # Possible unintentional type annotation (using :). Did you mean to assign (using =)?
|
||||
"B035", # Dictionary comprehension uses static key
|
||||
"B904", # Use raise from to specify exception cause
|
||||
"B905", # zip() without an explicit strict= parameter
|
||||
"BLE",
|
||||
@@ -70,12 +82,27 @@ select = [
|
||||
"RSE", # flake8-raise
|
||||
"RUF005", # Consider iterable unpacking instead of concatenation
|
||||
"RUF006", # Store a reference to the return value of asyncio.create_task
|
||||
"RUF007", # Prefer itertools.pairwise() over zip() when iterating over successive pairs
|
||||
"RUF008", # Do not use mutable default values for dataclass attributes
|
||||
"RUF010", # Use explicit conversion flag
|
||||
"RUF013", # PEP 484 prohibits implicit Optional
|
||||
"RUF016", # Slice in indexed access to type {value_type} uses type {index_type} instead of an integer
|
||||
"RUF017", # Avoid quadratic list summation
|
||||
"RUF018", # Avoid assignment expressions in assert statements
|
||||
"RUF019", # Unnecessary key check before dictionary access
|
||||
# "RUF100", # Unused `noqa` directive; temporarily every now and then to clean them up
|
||||
"RUF020", # {never_like} | T is equivalent to T
|
||||
"RUF021", # Parenthesize a and b expressions when chaining and and or together, to make the precedence clear
|
||||
"RUF022", # Sort __all__
|
||||
"RUF023", # Sort __slots__
|
||||
"RUF024", # Do not pass mutable objects as values to dict.fromkeys
|
||||
"RUF026", # default_factory is a positional-only argument to defaultdict
|
||||
"RUF030", # print() call in assert statement is likely unintentional
|
||||
"RUF032", # Decimal() called with float literal argument
|
||||
"RUF033", # __post_init__ method with argument defaults
|
||||
"RUF034", # Useless if-else condition
|
||||
"RUF100", # Unused `noqa` directive
|
||||
"RUF101", # noqa directives that use redirected rule codes
|
||||
"RUF200", # Failed to parse pyproject.toml: {message}
|
||||
"S102", # Use of exec detected
|
||||
"S103", # bad-file-permissions
|
||||
"S108", # hardcoded-temp-file
|
||||
@@ -88,7 +115,7 @@ select = [
|
||||
"S317", # suspicious-xml-sax-usage
|
||||
"S318", # suspicious-xml-mini-dom-usage
|
||||
"S319", # suspicious-xml-pull-dom-usage
|
||||
"S320", # suspicious-xmle-tree-usage
|
||||
# "S320", # suspicious-xmle-tree-usage
|
||||
"S601", # paramiko-call
|
||||
"S602", # subprocess-popen-with-shell-equals-true
|
||||
"S604", # call-with-shell-equals-true
|
||||
@@ -99,7 +126,7 @@ select = [
|
||||
"SLOT", # flake8-slots
|
||||
"T100", # Trace found: {name} used
|
||||
"T20", # flake8-print
|
||||
"TCH", # flake8-type-checking
|
||||
"TC", # flake8-type-checking
|
||||
"TID", # Tidy imports
|
||||
"TRY", # tryceratops
|
||||
"UP", # pyupgrade
|
||||
@@ -119,13 +146,12 @@ ignore = [
|
||||
# "PLC1901", # {existing} can be simplified to {replacement} as an empty string is falsey; too many false positives
|
||||
# "PLR0911", # Too many return statements ({returns} > {max_returns})
|
||||
# "PLR0912", # Too many branches ({branches} > {max_branches})
|
||||
# "PLR0913", # Too many arguments to function call ({c_args} > {max_args})
|
||||
"PLR0913", # Too many arguments to function call ({c_args} > {max_args})
|
||||
# "PLR0915", # Too many statements ({statements} > {max_statements})
|
||||
"PLR2004", # Magic value used in comparison, consider replacing {value} with a constant variable
|
||||
# "PLW2901", # Outer {outer_kind} variable {name} overwritten by inner {inner_kind} target
|
||||
# "PT004", # Fixture {fixture} does not return anything, add leading underscore
|
||||
# "PT011", # pytest.raises({exception}) is too broad, set the `match` parameter or use a more specific exception
|
||||
# "PT018", # Assertion should be broken down into multiple parts
|
||||
"PT018", # Assertion should be broken down into multiple parts
|
||||
# "RUF001", # String contains ambiguous unicode character.
|
||||
# "RUF002", # Docstring contains ambiguous unicode character.
|
||||
# "RUF003", # Comment contains ambiguous unicode character.
|
||||
@@ -136,14 +162,12 @@ ignore = [
|
||||
# "SIM115", # Use context handler for opening files
|
||||
|
||||
# Moving imports into type-checking blocks can mess with pytest.patch()
|
||||
"TCH001", # Move application import {} into a type-checking block
|
||||
"TCH002", # Move third-party import {} into a type-checking block
|
||||
"TCH003", # Move standard library import {} into a type-checking block
|
||||
"TC001", # Move application import {} into a type-checking block
|
||||
"TC002", # Move third-party import {} into a type-checking block
|
||||
"TC003", # Move standard library import {} into a type-checking block
|
||||
|
||||
"TRY003", # Avoid specifying long messages outside the exception class
|
||||
"TRY400", # Use `logging.exception` instead of `logging.error`
|
||||
# Ignored due to performance: https://github.com/charliermarsh/ruff/issues/2923
|
||||
"UP038", # Use `X | Y` in `isinstance` call instead of `(X, Y)`
|
||||
|
||||
# May conflict with the formatter, https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
|
||||
"W191",
|
||||
@@ -161,3 +185,14 @@ ignore = [
|
||||
"PLE0605"
|
||||
]
|
||||
|
||||
|
||||
[tool.ruff.lint.isort]
|
||||
force-sort-within-sections = true
|
||||
known-first-party = [
|
||||
"homeassistant",
|
||||
]
|
||||
combine-as-imports = true
|
||||
split-on-trailing-comma = false
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"scripts/*" = ["T201"]
|
||||
@@ -1,4 +1,6 @@
|
||||
homeassistant==2024.11.0
|
||||
homeassistant==2025.11.0
|
||||
pip>=21.3.1
|
||||
ruff==0.6.8
|
||||
|
||||
ruff>=0.9.1,<=0.15.0
|
||||
types-requests
|
||||
mypy~=1.19.1
|
||||
codespell
|
||||
|
||||
@@ -2,9 +2,9 @@
|
||||
|
||||
wheel
|
||||
home-assistant-bluetooth
|
||||
habluetooth>=3.6.0
|
||||
habluetooth>=5.3.0
|
||||
bluetooth-adapters
|
||||
pytest>=8.3.3
|
||||
pytest>=8.4.1
|
||||
pytest-cov>=5.0.0
|
||||
pytest-socket>=0.7.0
|
||||
pytest-asyncio>=0.24.0
|
||||
@@ -16,10 +16,10 @@ aiohttp
|
||||
aiohttp_cors
|
||||
aiohttp-fast-url-dispatcher
|
||||
aiohttp-zlib-ng
|
||||
bleak>=0.22.3
|
||||
bleak-retry-connector>=3.6.0
|
||||
bleak>=1.0.1
|
||||
bleak-retry-connector>=4.4.3
|
||||
bluetooth-data-tools
|
||||
pyserial-asyncio
|
||||
pyudev
|
||||
pytest-homeassistant-custom-component==0.13.181
|
||||
pytest-homeassistant-custom-component==0.13.294
|
||||
|
||||
|
||||
1
scripts/__init__.py
Normal file
1
scripts/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Script to extract PowerView homekey from a G3 PowerView Gateway."""
|
||||
119
scripts/extract_gateway3_homekey.py
Normal file
119
scripts/extract_gateway3_homekey.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""Extract PowerView homekey from a G3 PowerView Gateway."""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import struct
|
||||
from typing import Any, Final
|
||||
|
||||
import requests
|
||||
|
||||
HUB: Final[str] = "http://powerview-g3.local"
|
||||
TIMEOUT: Final[int] = 10
|
||||
|
||||
|
||||
def create_request(sid: int, cid: int, sequence_id: int, data: bytes) -> bytes:
|
||||
"""Assemble a request frame for the PowerView protocol."""
|
||||
return struct.pack("<BBBB", sid, cid, sequence_id, len(data)) + data
|
||||
|
||||
|
||||
def decode_response(packet: bytes) -> dict[str, Any]:
|
||||
"""Decode a response frame from the PowerView protocol."""
|
||||
if len(packet) < 4:
|
||||
raise ValueError("Packet size too small")
|
||||
sid, cid, sequence_id, length = struct.unpack("<BBBB", packet[0:4])
|
||||
if len(packet) != 4 + length:
|
||||
raise ValueError("Not all data present")
|
||||
if length < 1:
|
||||
raise ValueError("No errorCode present")
|
||||
(error_code,) = struct.unpack("<B", packet[4:5])
|
||||
data: Final[bytes] = packet[5:]
|
||||
return {
|
||||
"cid": cid,
|
||||
"sid": sid,
|
||||
"sequenceId": sequence_id,
|
||||
"errorCode": error_code,
|
||||
"data": data,
|
||||
}
|
||||
|
||||
|
||||
def create_get_shade_key_request(sequence_id) -> bytes:
|
||||
"""Create a GetShadeKey request frame."""
|
||||
return create_request(251, 18, sequence_id, b"")
|
||||
|
||||
|
||||
def get_shade_key(hub: str, ble_name) -> bytes:
|
||||
"""Get the homekey for a shade."""
|
||||
try:
|
||||
shades_exec_resp: requests.Response = requests.post(
|
||||
hub + "/home/shades/exec?shades=" + ble_name,
|
||||
json={"hex": create_get_shade_key_request(1).hex()},
|
||||
timeout=TIMEOUT,
|
||||
)
|
||||
shades_exec_resp.raise_for_status()
|
||||
except requests.exceptions.RequestException as ex:
|
||||
print(f"Unable to send GetShadeKey {ex!s}")
|
||||
raise
|
||||
|
||||
result: dict = json.loads(shades_exec_resp.content)
|
||||
responses = result.get("responses", [])
|
||||
if len(responses) != 1 or "hex" not in responses[0]:
|
||||
raise OSError(f"Error when attempting GetShadeKey: {result}")
|
||||
response: Final[bytes] = bytes.fromhex(responses[0]["hex"])
|
||||
dec_resp: Final[dict[str, Any]] = decode_response(response)
|
||||
if dec_resp["errorCode"] != 0:
|
||||
raise ValueError(
|
||||
f"BLE errorCode={dec_resp['errorCode']} data={dec_resp['data'].hex()}"
|
||||
)
|
||||
if len(dec_resp["data"]) != 16:
|
||||
raise ValueError("Expected 16 byte homekey")
|
||||
return dec_resp["data"]
|
||||
|
||||
|
||||
def main(hub: str) -> int:
|
||||
"""Extract the homekeys from all shades."""
|
||||
try:
|
||||
shades_resp: requests.Response = requests.get(
|
||||
hub + "/home/shades", timeout=TIMEOUT
|
||||
)
|
||||
shades_resp.raise_for_status()
|
||||
except requests.exceptions.RequestException as ex:
|
||||
print(f"Unable to get list of shades:\n\t{ex!s}")
|
||||
return -1
|
||||
|
||||
shades = json.loads(shades_resp.content)
|
||||
print(f"Found {len(shades)} shades, interrogating")
|
||||
network_key: bytes | None = None
|
||||
for shade in shades:
|
||||
name: str = base64.b64decode(shade["name"]).decode("utf-8")
|
||||
try:
|
||||
key: bytes = get_shade_key(hub, shade["bleName"])
|
||||
network_key = key
|
||||
except (OSError, ValueError) as ex:
|
||||
if network_key is not None:
|
||||
key = network_key
|
||||
print(f"Shade '{name}':")
|
||||
print(f"\tBLE name: '{shade['bleName']}'")
|
||||
print(f"\tHomeKey: {key.hex()} (shade unreachable, using network key)")
|
||||
else:
|
||||
print(f"Shade '{name}':")
|
||||
print(f"\tBLE name: '{shade['bleName']}'")
|
||||
print(f"\tHomeKey: ERROR - {ex}")
|
||||
continue
|
||||
|
||||
print(f"Shade '{name}':")
|
||||
print(f"\tBLE name: '{shade['bleName']}'")
|
||||
print(f"\tHomeKey: {key.hex()}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Extract PowerView homekey from a G3 PowerView Gateway"
|
||||
)
|
||||
parser.add_argument("hub", nargs="?", help="URL to HUB", default=HUB)
|
||||
args = parser.parse_args()
|
||||
sys.exit(main(**vars(args)))
|
||||
Reference in New Issue
Block a user