11 Commits

Author SHA1 Message Date
patman15
49bb1e79c9 add tilt functions 2025-01-07 09:50:41 +01:00
patman15
d73bb1d1c4 completed tilt control 2025-01-06 19:46:52 +01:00
patman15
e00e04159a support setting tilt 2025-01-05 19:31:26 +01:00
patman15
ea33e08da0 extend set_position() for all values 2025-01-05 12:12:16 +01:00
patman15
8c5c3d3c9a Update .gitignore 2024-12-29 13:00:15 +01:00
patman15
0299a4ed24 Merge branch 'main' into feature/type_8 2024-12-29 12:49:44 +01:00
patman15
21d31ba38a Merge branch 'main' into feature/type_8 2024-12-22 13:36:41 +01:00
patman15
b6b7e43b02 stronger typing 2024-12-22 13:35:29 +01:00
patman15
b550f98e2b Merge branch 'main' into feature/type_8 2024-12-21 09:10:50 +01:00
patman15
f6460280db Merge branch 'main' into feature/type_8 2024-12-20 21:16:37 +01:00
patman15
65b7b3814b disabled home_id filter 2024-12-20 20:41:01 +01:00
27 changed files with 180 additions and 1084 deletions

View File

@@ -8,8 +8,8 @@ on:
- cron: '0 5 * * 6'
jobs:
lint:
name: "Lint the code"
ruff:
name: "Ruff"
runs-on: "ubuntu-latest"
steps:
- name: "Checkout the repository"
@@ -18,17 +18,11 @@ jobs:
- name: "Set up Python"
uses: actions/setup-python@main
with:
python-version: "3.13.2"
python-version: "3.12"
cache: "pip"
- name: "Install requirements"
run: python3 -m pip install -r requirements.txt
- name: "Run ruff"
run: ruff check .
- name: "Run mypy"
run: mypy .
- name: "Run codespell"
run: codespell -L hass
- name: "Run Ruff"
run: python3 -m ruff check .

3
.gitignore vendored
View File

@@ -152,3 +152,6 @@ cython_debug/
#.idea/
aes.py
*.bak
emu/PV_BLE_cover/PV_BLE_cover.ino
img/
*.zip

View File

@@ -8,7 +8,8 @@
> [!WARNING]
> - This integration is under development!
> - Test coverage is low, malfunction might occur.
> - The HOME_KEY is lost over updates!
> - Only devices that are **not** added to the app are controlable. It is possible to add them to the app if you just want to monitor the status (position, battery) in Home Assistant.
> - Currently only position change is supported (e.g., no tilt)
## Features
- Zero configuration
@@ -17,7 +18,7 @@
### Supported Devices
Type* | Description
-- | --
-- | --
1 | Designer Roller
4 | Roman
5 | Bottom Up
@@ -25,7 +26,6 @@ Type* | Description
10 | Duette and Applause SkyLift
19 | Provenance Woven Wood
31, 32, 84 | Vignette
39 | Parkland
42 | M25T Roller Blind
49 | AC Roller
52 | Banded Shades
@@ -39,14 +39,10 @@ The integration provides the following information about the battery
Platform | Description | Unit | Details
-- | -- | -- | --
`binary_sensor` | battery charging indicator | `bool` | true if battery is charging
`button` | identify shade | - | identify shade by LED and 3 beeps
`cover` | view/control position | `%` | percentage cover is open (100% is open)
`sensor` | SoC (state of charge) | `%` | range 100% (full), 50%, 20%, 0% (battery empty)
## Installation
> [!IMPORTANT]
> In case you added your shades to the app or a gateway, you need to [set the encryption key](#set-the-encryption-key) manually in the [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py) file after **each** update!
### Automatic
Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom repository](https://hacs.xyz/docs/faq/custom_repositories/).
@@ -61,22 +57,12 @@ Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom rep
1. Restart Home Assistant
1. In the HA UI go to "Configuration" -> "Integrations" click "+" and search for "Hunter Douglas PowerView (BLE)"
## Set the Encryption Key
Currently, there are three methods to obtain the key:
1. Via adopting a BLE shade: There is a [shade emulator](/emu/PV_BLE_cover) that works with Arduino IDE and an ESP32 device (≥ 2MiB flash, ≥ 128KiB required), e.g. [Adafruit QT Py ESP32-S3](https://www.adafruit.com/product/5426). Install and connect via serial port, then go to the PowerView app and add the shade `myPVcover` to your home. You will see a log message `set shade key: \xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx` . Copy this key. You can delete the shade from the app when done.
2. Extracting from gateway: This [script](scripts/extract_gateway3_homekey.py) is able to extract the key from a working PowerView gateway.
3. Grabbing from the app: Checkout this [post in the Home Assistant community forum](https://community.home-assistant.io/t/hunter-douglas-powerview-gen-3-integration/424836/228).
Finally, you need to manually copy the key to [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py).
> [!IMPORTANT]
> You need to update the file after **each** update!
## Known Issues
<details><summary>Shade inoperable after charging</summary>
It seems that the shades require some re-initialization after charging. The solution is currently unknown, but as a workaround you can operate the shade ones using the vendor app.
</details>
## Outlook
- Add support for encryption
- Allow parallel usage to PowerView app as "remote"
- Add support for tilt function
- Add support for further device types
## Troubleshooting
In case you have severe troubles,
@@ -86,15 +72,6 @@ In case you have severe troubles,
- disable the log (Home Assistant will prompt you to download the log), and finally
- [open an issue](https://github.com/patman15/hdpv_ble/issues/new?assignees=&labels=Bug&projects=&template=bug.yml) with a good description of what happened and attach the log.
# Thanks To
[@mannkind](https://github.com/mannkind)
[license-shield]: https://img.shields.io/github/license/patman15/hdpv_ble.svg?style=for-the-badge
[releases-shield]: https://img.shields.io/github/release/patman15/hdpv_ble.svg?style=for-the-badge
[releases]: https://github.com//patman15/hdpv_ble/releases
## Outlook
- Add tests!
- Allow parallel usage to PowerView app as "remote"
- Add support for tilt function
- Add support for further device types

View File

@@ -4,24 +4,17 @@
@license: Apache-2.0 license
"""
from bleak.backends.device import BLEDevice
from bleak.exc import BleakError
from homeassistant.components.bluetooth import async_ble_device_from_address
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
from .const import LOGGER
from .const import DOMAIN, LOGGER
from .coordinator import PVCoordinator
PLATFORMS: list[Platform] = [
Platform.BINARY_SENSOR,
Platform.COVER,
Platform.SENSOR,
Platform.BUTTON,
]
PLATFORMS: list[Platform] = [Platform.BINARY_SENSOR, Platform.COVER, Platform.SENSOR]
type ConfigEntryType = ConfigEntry[PVCoordinator]
@@ -33,7 +26,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntryType) -> bool
if entry.unique_id is None:
raise ConfigEntryError("Missing unique ID for device.")
ble_device: BLEDevice | None = async_ble_device_from_address(
ble_device = async_ble_device_from_address(
hass=hass, address=entry.unique_id, connectable=True
)
@@ -42,12 +35,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntryType) -> bool
f"Could not find PowerView device ({entry.unique_id}) via Bluetooth"
)
coordinator = PVCoordinator(hass, ble_device, entry.data.copy(), entry.title)
coordinator = PVCoordinator(hass, ble_device, entry.data.copy())
try:
await coordinator.query_dev_info()
except BleakError as err:
raise ConfigEntryNotReady("Unable to query device info.") from err
# Insert the coordinator in the global registry
hass.data.setdefault(DOMAIN, {})
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(coordinator.async_start())

View File

@@ -1,9 +1,9 @@
"""Hunter Douglas PowerView BLE API."""
import asyncio
import time
from dataclasses import dataclass
from enum import Enum
import time
from typing import Final
from bleak import BleakClient
@@ -16,7 +16,6 @@ from cryptography.hazmat.primitives.ciphers.base import (
AEADDecryptionContext,
AEADEncryptionContext,
)
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
ATTR_CURRENT_TILT_POSITION,
@@ -51,7 +50,6 @@ SHADE_TYPE: Final[dict[int, str]] = {
8: "Duette, Top Down Bottom Up",
9: "Duette DuoLite, Top Down Bottom Up",
33: "Duette Architella, Top Down Bottom Up",
39: "Parkland",
47: "Pleated, Top Down Bottom Up",
# top down, tilt anywhere
51: "Venetian, Tilt Anywhere",
@@ -97,18 +95,26 @@ class PowerViewBLE:
def __init__(self, ble_device: BLEDevice, home_key: bytes = b"") -> None:
"""Initialize device API via Bluetooth."""
self._ble_device: BLEDevice = ble_device
self._ble_device: Final[BLEDevice] = ble_device
self.name: Final[str] = self._ble_device.name or "unknown"
self._seqcnt: int = 1
self._client: BleakClient = BleakClient(self._ble_device)
self._client: BleakClient = BleakClient(
self._ble_device,
disconnected_callback=self._on_disconnect,
services=[
UUID_COV_SERVICE,
# self.UUID_DEV_SERVICE,
# self.UUID_BAT_SERVICE,
],
)
self._data_event = asyncio.Event()
self._data: bytes = b""
self._data: bytearray = bytearray()
self._info: PVDeviceInfo = PVDeviceInfo()
self._is_encrypted: bool = False
self._cmd_lock: Final = asyncio.Lock()
self._cmd_next: tuple[ShadeCmd, bytes]
self._is_encrypted: bool = False
self._cipher: Final[Cipher | None] = (
Cipher(algorithms.AES(home_key), modes.CTR(bytes(16)))
Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16)))
if len(home_key) == 16
else None
)
@@ -117,10 +123,6 @@ class PowerViewBLE:
await self._data_event.wait()
self._data_event.clear()
def set_ble_device(self, ble_device: BLEDevice) -> None:
"""Update the BLE device reference (e.g. when proxy details change)."""
self._ble_device = ble_device
@property
def encrypted(self) -> bool:
"""Return whether communication with this shade is encrypted."""
@@ -130,11 +132,6 @@ class PowerViewBLE:
def encrypted(self, value: bool) -> None:
self._is_encrypted = value
@property
def has_key(self) -> bool:
"""Return True if a valid homekey was provided."""
return self._cipher is not None
@property
def info(self) -> PVDeviceInfo:
"""Return device information, e.g. SW version."""
@@ -196,7 +193,7 @@ class PowerViewBLE:
("position3", int(data[6])),
(ATTR_CURRENT_TILT_POSITION, int(data[7])),
("home_id", int.from_bytes(data[0:2], byteorder="little")),
("type_id", int(data[2])),
("type_id", int.from_bytes(data[2:3])),
("is_opening", bool(pos & 0x3 == 0x2)),
("is_closing", bool(pos & 0x3 == 0x1)),
("battery_charging", bool(pos & 0x3 == 0x3)), # observed
@@ -209,29 +206,27 @@ class PowerViewBLE:
async def set_position(
self,
pos1: int,
pos2: int = 0x8000,
pos3: int = 0x8000,
tilt: int = 0x8000,
pos2: int | None = None,
pos3: int | None = None,
tilt: int | None = None,
velocity: int = 0x0,
disconnect: bool = True,
) -> None:
"""Set position of device."""
LOGGER.debug(
"%s setting position to %i/%i/%i, tilt %i, velocity %s",
self.name,
pos1,
pos2,
pos3,
tilt,
velocity,
)
LOGGER.debug("%s setting position to %i, tilt %i", self.name, pos1, tilt)
await self._cmd(
(
ShadeCmd.SET_POSITION,
int.to_bytes(pos1 * 100, 2, byteorder="little")
+ int.to_bytes(pos2, 2, byteorder="little")
+ int.to_bytes(pos3, 2, byteorder="little")
+ int.to_bytes(tilt, 2, byteorder="little")
int.to_bytes(pos1, 2, byteorder="little")
+ int.to_bytes(
pos2 if pos2 is not None else 0x8000, 2, byteorder="little"
)
+ int.to_bytes(
pos3 if pos3 is not None else 0x8000, 2, byteorder="little"
)
+ int.to_bytes(
tilt if tilt is not None else 0x8000, 2, byteorder="little"
)
+ int.to_bytes(velocity, 1),
),
disconnect,
@@ -265,15 +260,14 @@ class PowerViewBLE:
),
)
async def identify(self, beeps: int = 0x3) -> None:
"""Identify device."""
LOGGER.debug("%s identify (%i)", self.name, beeps)
await self._cmd((ShadeCmd.IDENTIFY, bytes([min(beeps, 0xFF)])))
def _verify_response(self, data: bytes, seq_nr: int, cmd: ShadeCmd) -> bool:
def _verify_response(self, din: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
"""Verify shade response data."""
data: bytearray = din
if self._cipher is not None and self._is_encrypted:
dec: AEADDecryptionContext = self._cipher.decryptor()
data = bytearray(dec.update(din) + dec.finalize())
if len(data) < 4:
LOGGER.error("Response message too short")
LOGGER.error("Reponse message too short")
return False
if int.from_bytes(data[0:2], byteorder="little") != cmd.value & 0xFFEF:
LOGGER.warning("Response to wrong command")
@@ -329,10 +323,10 @@ class PowerViewBLE:
def _notification_handler(self, _sender, data: bytearray) -> None:
LOGGER.debug("%s received BLE data: %s", self.name, data.hex(" "))
self._data = bytes(data)
self._data = data
if self._cipher is not None and self._is_encrypted:
dec: AEADDecryptionContext = self._cipher.decryptor()
self._data = bytes(dec.update(bytes(data)) + dec.finalize())
self._data = bytearray(dec.update(data) + dec.finalize())
LOGGER.debug(
"%s %s",
"decoded data: ".rjust(19 + len(self.name)),
@@ -356,7 +350,6 @@ class PowerViewBLE:
self._ble_device,
self.name,
disconnected_callback=self._on_disconnect,
ble_device_callback=lambda: self._ble_device,
services=[
UUID_COV_SERVICE,
UUID_DEV_SERVICE,

View File

@@ -40,9 +40,7 @@ async def async_setup_entry(
)
class PVBinarySensor(
PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySensorEntity
): # type: ignore[reportIncompatibleMethodOverride]
class PVBinarySensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySensorEntity): # type: ignore[reportIncompatibleMethodOverride]
"""The generic PV binary sensor implementation."""
def __init__(
@@ -51,7 +49,7 @@ class PVBinarySensor(
descr: BinarySensorEntityDescription,
unique_id: str,
) -> None:
"""Initialize PV binary sensor."""
"""Intialize PV binary sensor."""
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
self._attr_device_info = coord.device_info
self._attr_has_entity_name = True

View File

@@ -1,71 +0,0 @@
"""Hunter Douglas Powerview cover."""
from typing import Final
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
from homeassistant.components.button import (
ButtonDeviceClass,
ButtonEntity,
ButtonEntityDescription,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo, format_mac
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import DOMAIN, LOGGER
from .coordinator import PVCoordinator
BUTTONS_SHADE: Final = [
ButtonEntityDescription(
key="identify",
device_class=ButtonDeviceClass.IDENTIFY,
entity_category=EntityCategory.DIAGNOSTIC,
),
]
async def async_setup_entry(
_hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the demo cover platform."""
coordinator: PVCoordinator = config_entry.runtime_data
for descr in BUTTONS_SHADE:
async_add_entities([PowerViewButton(coordinator, descr)])
class PowerViewButton(PassiveBluetoothCoordinatorEntity[PVCoordinator], ButtonEntity): # type: ignore[reportIncompatibleVariableOverride]
"""Representation of a powerview shade."""
_attr_has_entity_name = True
_attr_device_class = ButtonDeviceClass.IDENTIFY
def __init__(
self,
coordinator: PVCoordinator,
description: ButtonEntityDescription,
) -> None:
"""Initialize the shade."""
self.entity_description = description
self._coord: PVCoordinator = coordinator
self._attr_device_info = self._coord.device_info
self._attr_unique_id = (
f"{DOMAIN}_{format_mac(self._coord.address)}_{ButtonDeviceClass.IDENTIFY}"
)
super().__init__(coordinator)
@property
def device_info(self) -> DeviceInfo: # type: ignore[reportIncompatibleVariableOverride]
"""Return the device_info of the device."""
return self._coord.device_info
async def async_press(self) -> None:
"""Handle the button press."""
LOGGER.debug("identify cover")
await self._coord.api.identify()

View File

@@ -1,15 +1,9 @@
"""Config flow for BLE Battery Management System integration."""
import asyncio
import base64
import contextlib
from dataclasses import dataclass
import struct
from typing import Any
import aiohttp
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.bluetooth import (
BluetoothServiceInfoBleak,
@@ -17,163 +11,19 @@ from homeassistant.components.bluetooth import (
)
from homeassistant.config_entries import ConfigFlowResult
from homeassistant.const import CONF_ADDRESS
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.selector import (
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
# from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
from .api import UUID_COV_SERVICE as UUID
from .const import CONF_HOME_KEY, DOMAIN, LOGGER, MFCT_ID
def _needs_encryption(manufacturer_data_hex: str) -> bool:
"""Return True if the BLE advertisement indicates encryption (home_id != 0)."""
data = bytearray.fromhex(manufacturer_data_hex)
if len(data) < 2:
return False
home_id = int.from_bytes(data[0:2], byteorder="little")
return home_id != 0
@dataclass
class HubShadeInfo:
"""Shade metadata from the PowerView hub."""
name: str # Human-readable name (decoded from base64)
ble_name: str # BLE advertisement name, e.g. "DUE:94ED"
async def _fetch_shades_from_hub(
hass: HomeAssistant, hub_url: str
) -> list[HubShadeInfo]:
"""Fetch shade list with human-readable names from a PowerView G3 hub.
Raises aiohttp.ClientError on network errors.
Raises asyncio.TimeoutError on timeout.
"""
session = async_get_clientsession(hass)
timeout = aiohttp.ClientTimeout(total=10)
async with session.get(f"{hub_url}/home/shades", timeout=timeout) as resp:
resp.raise_for_status()
shades = await resp.json(content_type=None)
if not shades:
return []
hub_shades: list[HubShadeInfo] = []
for shade in shades:
ble_name = shade.get("bleName", "")
if not ble_name:
continue
name_b64 = shade.get("name", "")
try:
name = base64.b64decode(name_b64).decode("utf-8") if name_b64 else ble_name
except Exception: # noqa: BLE001
name = ble_name
hub_shades.append(HubShadeInfo(name=name, ble_name=ble_name))
return hub_shades
async def _fetch_key_and_shades_from_hub(
hass: HomeAssistant, hub_url: str
) -> tuple[bytes, list[HubShadeInfo]]:
"""Fetch 16-byte homekey and shade list from a PowerView G3 hub.
Returns (key, shade_list). The key is network-wide so any reachable shade
returns the same value. The shade list contains human-readable names that
can be used to label BLE-discovered devices.
Raises ValueError on protocol/key errors.
Raises aiohttp.ClientError on network errors.
Raises asyncio.TimeoutError on timeout.
"""
hub_shades = await _fetch_shades_from_hub(hass, hub_url)
if not hub_shades:
raise ValueError("No shades found on the hub")
session = async_get_clientsession(hass)
timeout = aiohttp.ClientTimeout(total=10)
# GetShadeKey BLE request: sid=251, cid=18, seqId=1, data_len=0
request_frame = struct.pack("<BBBB", 251, 18, 1, 0)
# Try each shade until one returns a valid key (some may be out of range)
last_error: Exception = ValueError("No shades responded")
for hs in hub_shades:
try:
async with session.post(
f"{hub_url}/home/shades/exec?shades={hs.ble_name}",
json={"hex": request_frame.hex()},
timeout=timeout,
) as resp:
resp.raise_for_status()
result = await resp.json(content_type=None)
except (TimeoutError, aiohttp.ClientError) as ex:
last_error = ex
continue
responses = result.get("responses", [])
if len(responses) != 1 or "hex" not in responses[0]:
continue
response_bytes = bytes.fromhex(responses[0]["hex"])
if len(response_bytes) < 5:
continue
_s, _c, _q, length = struct.unpack("<BBBB", response_bytes[0:4])
if len(response_bytes) != 4 + length:
continue
if response_bytes[4] != 0:
continue
key_data = response_bytes[5:]
if len(key_data) != 16:
continue
return key_data, hub_shades
raise ValueError(f"No reachable shade returned a valid key: {last_error}")
_HOMEKEY_SCHEMA = vol.Schema(
{
vol.Required("key_method", default="hub"): SelectSelector(
SelectSelectorConfig(
options=[
SelectOptionDict(
value="hub",
label="Fetch automatically from PowerView hub",
),
SelectOptionDict(
value="manual",
label="Enter key manually (32 hex characters)",
),
SelectOptionDict(
value="skip",
label="Skip (no key — controls disabled for encrypted shades)",
),
]
)
),
vol.Optional("hub_url", default="http://powerview-g3.local"): TextSelector(
TextSelectorConfig(type=TextSelectorType.URL)
),
vol.Optional("home_key", default=""): TextSelector(
TextSelectorConfig(type=TextSelectorType.TEXT)
),
}
)
from .const import DOMAIN, LOGGER, MFCT_ID
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for BT Battery Management System."""
VERSION = 1
MINOR_VERSION = 1
MINOR_VERSION = 0
@dataclass
class DiscoveredDevice:
@@ -187,83 +37,6 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self._discovered_device: ConfigFlow.DiscoveredDevice | None = None
self._discovered_devices: dict[str, ConfigFlow.DiscoveredDevice] = {}
self._manufacturer_data_hex: str = ""
self._device_name: str = ""
self._home_key: str = ""
self._hub_url: str = ""
self._hub_shades: list[HubShadeInfo] = []
def _create_entry(self) -> ConfigFlowResult:
"""Create the config entry with collected data."""
data: dict[str, str] = {
"manufacturer_data": self._manufacturer_data_hex,
CONF_HOME_KEY: self._home_key,
}
if self._hub_url:
data["hub_url"] = self._hub_url
return self.async_create_entry(
title=self._device_name,
data=data,
)
def _validate_manual_key(
self, user_input: dict[str, Any], errors: dict[str, str]
) -> bool:
"""Validate a manually entered hex key and store it.
Returns True on success, False on validation error.
"""
raw = user_input.get("home_key", "").strip()
if "\\x" in raw:
raw = raw.replace("\\x", "")
if len(raw) != 32:
errors["home_key"] = "invalid_key_length"
return False
try:
bytes.fromhex(raw)
except ValueError:
errors["home_key"] = "invalid_key_format"
return False
self._home_key = raw.lower()
return True
async def _validate_homekey_input(
self, user_input: dict[str, Any], errors: dict[str, str]
) -> bool:
"""Parse and validate homekey user_input, populating self state.
Returns True on success, False on validation error (errors dict is populated).
On skip, self._home_key is set to "".
"""
method = user_input.get("key_method", "skip")
if method == "skip":
self._home_key = ""
return True
if method == "manual":
return self._validate_manual_key(user_input, errors)
if method != "hub":
return False
hub_url = user_input.get("hub_url", "").rstrip("/")
_HUB_ERROR_MAP: dict[type[Exception], str] = {
aiohttp.ClientResponseError: "hub_http_error",
aiohttp.ClientConnectionError: "hub_connection_error",
TimeoutError: "hub_timeout",
ValueError: "hub_protocol_error",
}
try:
key, hub_shades = await _fetch_key_and_shades_from_hub(self.hass, hub_url)
except tuple(_HUB_ERROR_MAP) as ex:
errors["hub_url"] = _HUB_ERROR_MAP[type(ex)]
return False
self._home_key = key.hex()
self._hub_url = hub_url
self._hub_shades = hub_shades
return True
async def async_step_bluetooth(
self, discovery_info: BluetoothServiceInfoBleak
@@ -288,17 +61,10 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
LOGGER.debug("confirm step for %s", self._discovered_device.name)
if user_input is not None:
self._manufacturer_data_hex = (
self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()
return self.async_create_entry(
title=self._discovered_device.name,
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
)
self._device_name = self._discovered_device.name
# Unencrypted shades can skip the homekey step entirely
if not _needs_encryption(self._manufacturer_data_hex):
await self._resolve_friendly_name()
return self._create_entry()
return await self.async_step_homekey_bluetooth()
self._set_confirm_only()
@@ -307,152 +73,24 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
description_placeholders={"name": self._discovered_device.name},
)
async def async_step_homekey_bluetooth(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Configure homekey for a shade discovered via Bluetooth."""
# Reuse an existing key if another shade was already configured
existing = self._existing_home_key()
if existing and user_input is None:
self._home_key = existing
await self._resolve_friendly_name()
return self._create_entry()
errors: dict[str, str] = {}
if user_input is not None and await self._validate_homekey_input(
user_input, errors
):
# Use hub name for the entry title if available
friendly = self._hub_name_for(self._device_name)
if friendly:
self._device_name = friendly
return self._create_entry()
return self.async_show_form(
step_id="homekey_bluetooth",
data_schema=_HOMEKEY_SCHEMA,
errors=errors,
description_placeholders={
"name": self._device_name,
"hub_url_example": "http://powerview-g3.local",
},
)
def _existing_entry_value(self, key: str) -> str:
"""Return the first non-empty value for *key* across configured entries."""
for entry in self._async_current_entries():
if value := entry.data.get(key, ""):
return value
return ""
def _existing_home_key(self) -> str:
"""Return the home_key from any already-configured entry, or ''."""
return self._existing_entry_value(CONF_HOME_KEY)
async def _resolve_friendly_name(self) -> None:
"""Try to resolve BLE device name to hub friendly name."""
hub_url = self._hub_url or self._existing_entry_value("hub_url")
if not hub_url:
return
try:
shades = await _fetch_shades_from_hub(self.hass, hub_url)
for hs in shades:
if hs.ble_name == self._device_name:
self._device_name = hs.name
break
if not self._hub_url:
self._hub_url = hub_url
except (TimeoutError, aiohttp.ClientError, ValueError):
pass
def _hub_name_for(self, ble_name: str) -> str | None:
"""Return the human-readable hub name for a BLE name, or None."""
for hs in self._hub_shades:
if hs.ble_name == ble_name:
return hs.name
return None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the user step — reuse existing key or offer a menu."""
"""Handle the user step to pick discovered device."""
LOGGER.debug("user step")
existing = self._existing_home_key()
if existing:
self._home_key = existing
self._hub_url = self._hub_url or self._existing_entry_value("hub_url")
if self._hub_url and not self._hub_shades:
with contextlib.suppress(
TimeoutError, aiohttp.ClientError, ValueError
):
self._hub_shades = await _fetch_shades_from_hub(
self.hass, self._hub_url
)
return self.async_show_menu(
step_id="user",
menu_options=["select_device", "manual"],
)
return await self.async_step_homekey()
def _build_selected_entries(
self, user_input: dict[str, Any]
) -> list[dict[str, Any]]:
"""Build config entry data for each selected shade address."""
addresses: list[str] = user_input[CONF_ADDRESS]
if isinstance(addresses, str):
addresses = [addresses]
entries: list[dict[str, Any]] = []
for address in addresses:
device = self._discovered_devices[address]
ble_name = device.name
name = self._hub_name_for(ble_name) or ble_name
mfct_hex = device.discovery_info.manufacturer_data[MFCT_ID].hex()
entry_data: dict[str, str] = {
"manufacturer_data": mfct_hex,
CONF_HOME_KEY: self._home_key,
}
if self._hub_url:
entry_data["hub_url"] = self._hub_url
entries.append(
{
"address": address,
"name": name,
"data": entry_data,
}
)
return entries
async def async_step_select_device(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Select one or more BLE-discovered shades, or fall through to manual."""
LOGGER.debug("select_device step")
if user_input is not None:
entries = self._build_selected_entries(user_input)
# Kick off auto-add flows for all but the last shade
await asyncio.gather(
*(
self.hass.config_entries.flow.async_init(
DOMAIN,
context={"source": "auto_add"},
data=info,
)
for info in entries[:-1]
)
)
# Create the final entry normally (ends this flow)
last = entries[-1]
await self.async_set_unique_id(last["address"], raise_on_progress=False)
address = user_input[CONF_ADDRESS]
await self.async_set_unique_id(address, raise_on_progress=False)
self._abort_if_unique_id_configured()
self._device_name = last["name"]
self._manufacturer_data_hex = last["data"]["manufacturer_data"]
self.context["title_placeholders"] = {"name": self._device_name}
return self._create_entry()
self._discovered_device = self._discovered_devices[address]
self.context["title_placeholders"] = {"name": self._discovered_device.name}
return self.async_create_entry(
title=self._discovered_device.name,
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
)
current_addresses = self._async_current_ids()
for discovery_info in async_discovered_service_info(self.hass, False):
@@ -471,96 +109,19 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
)
if not self._discovered_devices:
return await self.async_step_manual()
return self.async_abort(reason="no_devices_found")
titles: list[SelectOptionDict] = []
titles = []
for address, discovery in self._discovered_devices.items():
hub_name = self._hub_name_for(discovery.name)
label = f"{hub_name} ({discovery.name})" if hub_name else discovery.name
titles.append({"value": address, "label": label})
titles.append({"value": address, "label": discovery.name})
return self.async_show_form(
step_id="select_device",
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_ADDRESS): SelectSelector(
SelectSelectorConfig(options=titles, multiple=True)
SelectSelectorConfig(options=titles)
)
}
),
)
async def async_step_auto_add(
self, discovery_info: dict[str, Any]
) -> ConfigFlowResult:
"""Handle a shade queued from multi-select for individual setup."""
await self.async_set_unique_id(discovery_info["address"])
self._abort_if_unique_id_configured()
self._device_name = discovery_info["name"]
self._manufacturer_data_hex = discovery_info["data"]["manufacturer_data"]
self._home_key = discovery_info["data"].get(CONF_HOME_KEY, "")
self._hub_url = discovery_info["data"].get("hub_url", "")
self.context["title_placeholders"] = {"name": self._device_name}
return await self.async_step_auto_add_confirm()
async def async_step_auto_add_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm adding a shade discovered via multi-select."""
if user_input is not None:
return self._create_entry()
self._set_confirm_only()
return self.async_show_form(
step_id="auto_add_confirm",
description_placeholders={"name": self._device_name},
)
async def async_step_manual(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle manual entry of a BLE device address and name."""
if user_input is not None:
address = user_input[CONF_ADDRESS].upper().strip()
self._device_name = user_input["ble_name"].strip()
await self.async_set_unique_id(address, raise_on_progress=False)
self._abort_if_unique_id_configured()
self.context["title_placeholders"] = {"name": self._device_name}
self._manufacturer_data_hex = ""
return self._create_entry()
return self.async_show_form(
step_id="manual",
data_schema=vol.Schema(
{
vol.Required(CONF_ADDRESS): TextSelector(
TextSelectorConfig(type=TextSelectorType.TEXT)
),
vol.Required("ble_name"): TextSelector(
TextSelectorConfig(type=TextSelectorType.TEXT)
),
}
),
)
async def async_step_homekey(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Configure homekey — collected before device selection."""
errors: dict[str, str] = {}
if user_input is not None and await self._validate_homekey_input(
user_input, errors
):
return await self.async_step_select_device()
return self.async_show_form(
step_id="homekey",
data_schema=_HOMEKEY_SCHEMA,
errors=errors,
description_placeholders={
"hub_url_example": "http://powerview-g3.local",
},
)

View File

@@ -3,12 +3,25 @@
import logging
from typing import Final
# from bleak.uuids import normalize_uuid_str
# from homeassistant.const import ( # noqa: F401
# ATTR_BATTERY_CHARGING,
# ATTR_BATTERY_LEVEL,
# ATTR_TEMPERATURE,
# ATTR_VOLTAGE,
# )
DOMAIN: Final[str] = "hunterdouglas_powerview_ble"
LOGGER: Final = logging.getLogger(__package__)
MFCT_ID: Final[int] = 2073
TIMEOUT: Final[int] = 5
CONF_HOME_KEY: Final[str] = "home_key"
# put the key here, needs to be 16 bytes long, e.g.
# HOME_KEY: Final[bytes] = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
HOME_KEY: Final[bytes] = b""
# attributes (do not change)
ATTR_RSSI: Final[str] = "rssi"

View File

@@ -3,7 +3,6 @@
from typing import Any
from bleak.backends.device import BLEDevice
from homeassistant.components import bluetooth
from homeassistant.components.bluetooth.const import DOMAIN as BLUETOOTH_DOMAIN
from homeassistant.components.bluetooth.passive_update_coordinator import (
@@ -13,35 +12,26 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import CONNECTION_BLUETOOTH, DeviceInfo
from .api import SHADE_TYPE, PowerViewBLE
from .const import ATTR_RSSI, CONF_HOME_KEY, DOMAIN, LOGGER
from .const import ATTR_RSSI, DOMAIN, HOME_KEY, LOGGER
class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
"""Update coordinator for a battery management system."""
def __init__(
self,
hass: HomeAssistant,
ble_device: BLEDevice,
data: dict[str, Any],
friendly_name: str | None = None,
self, hass: HomeAssistant, ble_device: BLEDevice, data: dict[str, Any]
) -> None:
"""Initialize BMS data coordinator."""
assert ble_device.name is not None
self._mac = ble_device.address
self._friendly_name = friendly_name or ble_device.name
home_key_hex: str = data.get(CONF_HOME_KEY, "")
home_key: bytes = (
bytes.fromhex(home_key_hex) if len(home_key_hex) == 32 else b""
)
self.api = PowerViewBLE(ble_device, home_key)
self.api = PowerViewBLE(ble_device, HOME_KEY)
self.data: dict[str, int | float | bool] = {}
self._manuf_dat = data.get("manufacturer_data")
self.dev_details: dict[str, str] = {}
LOGGER.debug(
"Initializing coordinator for %s (%s)",
self._friendly_name,
ble_device.name,
ble_device.address,
)
super().__init__(
@@ -59,15 +49,16 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
@property
def device_info(self) -> DeviceInfo:
"""Return detailed device information for GUI."""
LOGGER.debug("%s: device_info, %s", self._friendly_name, self.dev_details)
LOGGER.debug("%s: device_info, %s", self.name, self.dev_details)
return DeviceInfo(
identifiers={
(DOMAIN, self.address),
(DOMAIN, self.name),
(BLUETOOTH_DOMAIN, self.address),
},
connections={(CONNECTION_BLUETOOTH, self.address)},
name=self._friendly_name,
name=self.name,
configuration_url=None,
# properties used in GUI:
manufacturer="Hunter Douglas",
model=(
str(SHADE_TYPE.get(int(bytes.fromhex(self._manuf_dat)[2]), "unknown"))
@@ -89,7 +80,7 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
def _async_stop(self) -> None:
"""Shutdown coordinator and any connection."""
LOGGER.debug("%s: shutting down BMS device", self.name)
LOGGER.debug("%s: shuting down BMS device", self.name)
self.hass.async_create_task(self.api.disconnect())
super()._async_stop()
@@ -101,8 +92,10 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
) -> None:
"""Handle a Bluetooth event."""
# if not self.dev_details:
# self.hass.async_create_task(self._get_device_info())
LOGGER.debug("BLE event %s: %s", change, service_info.manufacturer_data)
self.api.set_ble_device(service_info.device)
self.data = {ATTR_RSSI: service_info.rssi}
if change == bluetooth.BluetoothChange.ADVERTISEMENT:
self.data.update(

View File

@@ -3,7 +3,6 @@
from typing import Any, Final
from bleak.exc import BleakError
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
@@ -22,7 +21,7 @@ from homeassistant.helpers.device_registry import DeviceInfo, format_mac
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .api import CLOSED_POSITION, OPEN_POSITION
from .const import DOMAIN, LOGGER
from .const import DOMAIN, HOME_KEY, LOGGER
from .coordinator import PVCoordinator
@@ -34,14 +33,11 @@ async def async_setup_entry(
"""Set up the demo cover platform."""
coordinator: PVCoordinator = config_entry.runtime_data
model: Final[str | None] = coordinator.dev_details.get("model")
entities: list[PowerViewCover] = []
if model == "39":
entities.append(PowerViewCoverTiltOnly(coordinator))
else:
entities.append(PowerViewCover(coordinator))
async_add_entities(entities)
async_add_entities(
[PowerViewCoverTilt(coordinator)]
if coordinator.dev_details.get("model") in ["51", "62"]
else [PowerViewCover(coordinator)]
)
class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEntity): # type: ignore[reportIncompatibleVariableOverride]
@@ -62,7 +58,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
) -> None:
"""Initialize the shade."""
LOGGER.debug("%s: init() PowerViewCover", coordinator.name)
self._attr_name = None
self._attr_name = CoverDeviceClass.SHADE
self._coord: PVCoordinator = coordinator
self._attr_device_info = self._coord.device_info
self._target_position: int | None = round(
@@ -107,7 +103,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
def supported_features(self) -> CoverEntityFeature: # type: ignore[reportIncompatibleVariableOverride]
"""Flag supported features, disable control if encryption is needed."""
if (
self._coord.data.get("home_id") and not self._coord.api.has_key
self._coord.data.get("home_id") and len(HOME_KEY) != 16
) or self._coord.data.get("battery_charging"):
return CoverEntityFeature(0)
@@ -201,7 +197,6 @@ class PowerViewCoverTilt(PowerViewCover):
self,
coordinator: PVCoordinator,
) -> None:
"""Initialize the shade with tilt."""
LOGGER.debug("%s: init() PowerViewCoverTilt", coordinator.name)
super().__init__(coordinator)
@@ -253,45 +248,3 @@ class PowerViewCoverTilt(PowerViewCover):
LOGGER.debug("close cover tilt")
_kwargs = {**kwargs, ATTR_TILT_POSITION: CLOSED_POSITION}
await self.async_set_cover_tilt_position(**_kwargs)
class PowerViewCoverTiltOnly(PowerViewCoverTilt):
"""Representation of a PowerView shade with additional tilt functionality."""
OPENCLOSED_THRESHOLD = 5
_attr_device_class = CoverDeviceClass.BLIND
_attr_supported_features = (
CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
| CoverEntityFeature.STOP_TILT
| CoverEntityFeature.SET_TILT_POSITION
)
def __init__(
self,
coordinator: PVCoordinator,
) -> None:
"""Initialize the shade with tilt only."""
LOGGER.debug("%s: init() PowerViewCoverTiltOnly", coordinator.name)
super().__init__(coordinator)
@property
def is_opening(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is opening or not."""
return False
@property
def is_closing(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is closing or not."""
return False
@property
def is_closed(self) -> bool: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is closed."""
return isinstance(self.current_cover_tilt_position, int) and (
self.current_cover_tilt_position
>= OPEN_POSITION - PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
or self.current_cover_tilt_position
<= CLOSED_POSITION + PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
)

View File

@@ -16,5 +16,5 @@
"issue_tracker": "https://github.com/patman15/hdpv_ble/issues",
"loggers": ["hunterdouglas_powerview_ble"],
"requirements": ["cryptography>=43.0.0"],
"version": "0.24"
"version": "0.22"
}

View File

@@ -3,8 +3,14 @@
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
from homeassistant.components.sensor import SensorEntity, SensorEntityDescription
from homeassistant.components.sensor.const import SensorDeviceClass, SensorStateClass
from homeassistant.components.sensor import (
SensorEntity,
SensorEntityDescription,
)
from homeassistant.components.sensor.const import (
SensorDeviceClass,
SensorStateClass,
)
from homeassistant.const import (
ATTR_BATTERY_LEVEL,
PERCENTAGE,
@@ -61,7 +67,7 @@ class PVSensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], SensorEntity):
def __init__(
self, pv_dev: PVCoordinator, descr: SensorEntityDescription, unique_id: str
) -> None:
"""Initialize the BMS sensor."""
"""Intitialize the BMS sensor."""
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
self._attr_device_info = pv_dev.device_info
self.entity_description = descr

View File

@@ -2,73 +2,10 @@
"config": {
"flow_title": "Setup {name}",
"step": {
"user": {
"title": "Add PowerView Shade",
"menu_options": {
"select_device": "Select from discovered shades",
"manual": "Enter device details manually"
},
"menu_option_descriptions": {
"select_device": "Choose from shades detected via Bluetooth nearby.",
"manual": "Enter a Bluetooth MAC address and device name directly, for example if a shade is out of range of discovery."
}
},
"bluetooth_confirm": {
"description": "[%key:component::bluetooth::config::step::bluetooth_confirm::description%]"
},
"homekey": {
"title": "Configure HomeKey",
"description": "All shades on a PowerView network share the same encryption key. The recommended method is to fetch it from your G3 hub. You can also enter the key manually, or skip if your shades are unencrypted.",
"data": {
"key_method": "Key source",
"hub_url": "PowerView hub URL",
"home_key": "HomeKey (32 hex characters or \\xNN format)"
},
"data_description": {
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
}
},
"homekey_bluetooth": {
"title": "Configure HomeKey for {name}",
"description": "This shade uses encryption. The recommended method is to fetch the key from your G3 hub. You can also enter it manually, or skip (controls will be disabled until a key is configured).",
"data": {
"key_method": "Key source",
"hub_url": "PowerView hub URL",
"home_key": "HomeKey (32 hex characters or \\xNN format)"
},
"data_description": {
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
}
},
"auto_add_confirm": {
"description": "Do you want to set up {name}?"
},
"select_device": {
"title": "Select Shades",
"description": "Select the PowerView shades to add via Bluetooth.",
"data": {
"address": "Shades"
}
},
"manual": {
"title": "Enter Device Details",
"description": "Enter the device details manually.",
"data": {
"address": "Bluetooth MAC address (e.g. AA:BB:CC:DD:EE:FF)",
"ble_name": "BLE device name (e.g. DUE:94ED)"
}
}
},
"error": {
"invalid_key_format": "HomeKey must be a valid hexadecimal string",
"invalid_key_length": "HomeKey must be exactly 32 hex characters (16 bytes)",
"hub_connection_error": "Cannot connect to the PowerView hub",
"hub_http_error": "Hub returned an HTTP error",
"hub_timeout": "Connection to hub timed out",
"hub_protocol_error": "Hub returned an unexpected response"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]",

View File

@@ -1,75 +1,15 @@
{
"config": {
"flow_title": "Setup {name}",
"step": {
"user": {
"title": "Add PowerView Shade",
"menu_options": {
"select_device": "Select from discovered shades",
"manual": "Enter device details manually"
},
"menu_option_descriptions": {
"select_device": "Choose from shades detected via Bluetooth nearby.",
"manual": "Enter a Bluetooth MAC address and device name directly, for example if a shade is out of range of discovery."
}
},
"bluetooth_confirm": {
"description": "Do you want to set up {name}?"
},
"homekey": {
"title": "Configure HomeKey",
"description": "All shades on a PowerView network share the same encryption key. The recommended method is to fetch it from your G3 hub. You can also enter the key manually, or skip if your shades are unencrypted.",
"data": {
"key_method": "Key source",
"hub_url": "PowerView hub URL",
"home_key": "HomeKey (32 hex characters or \\xNN format)"
},
"data_description": {
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
}
},
"homekey_bluetooth": {
"title": "Configure HomeKey for {name}",
"description": "This shade uses encryption. The recommended method is to fetch the key from your G3 hub. You can also enter it manually, or skip (controls will be disabled until a key is configured).",
"data": {
"key_method": "Key source",
"hub_url": "PowerView hub URL",
"home_key": "HomeKey (32 hex characters or \\xNN format)"
},
"data_description": {
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
}
},
"select_device": {
"title": "Select Shades",
"description": "Select the PowerView shades to add via Bluetooth.",
"data": {
"address": "Shades"
}
},
"manual": {
"title": "Enter Device Details",
"description": "Enter the device details manually.",
"data": {
"address": "Bluetooth MAC address (e.g. AA:BB:CC:DD:EE:FF)",
"ble_name": "BLE device name (e.g. DUE:94ED)"
}
}
},
"error": {
"invalid_key_format": "HomeKey must be a valid hexadecimal string",
"invalid_key_length": "HomeKey must be exactly 32 hex characters (16 bytes)",
"hub_connection_error": "Cannot connect to the PowerView hub",
"hub_http_error": "Hub returned an HTTP error",
"hub_timeout": "Connection to hub timed out",
"hub_protocol_error": "Hub returned an unexpected response"
},
"abort": {
"already_configured": "Device is already configured",
"no_devices_found": "No devices found on the network",
"not_supported": "Device not supported"
},
"flow_title": "Setup {name}",
"step": {
"bluetooth_confirm": {
"description": "Do you want to set up {name}?"
}
}
}
}

View File

@@ -2,12 +2,6 @@
* Emulate a Hunter Douglas PowerView cover device using ESP32
* used e.g. to gain the home_key from an existing installation via BLE
*
* REQUIRES:
* - ESP32 Board Definitions 3.0.x (tested on 3.0.7)
* - WolfSSL 5.7.x (tested on 5.7.6)
* - Phone Region: Potentially an alternative region depending on the app response
* - e.g. To add Parkland shades in the US, phone region set to the UK temporarily
*
* TODO:
* - cleanup code
* - think about emulating a remote
@@ -17,13 +11,9 @@
*/
#define NAME "myPVcover"
const uint16_t SW_VERSION = 391;
const char *SERIAL_NR = "01234567890ABCDEF";
const uint16_t TYP_ID = 42; // 62
const uint16_t MODEL_ID = 224;
const uint16_t FW_REVISION = 27;
const uint32_t HW_REVISION = 171103;
const uint8_t BATTERY_LEVEL = 42;
#define FW_VERSION "391"
#define SERIAL_NR "01234567890ABCDEF"
#include <BLEDevice.h>
#include <BLEServer.h>
@@ -50,11 +40,7 @@ int devId = INVALID_DEVID; //if not using async INVALID_DEVID is default
#define DEV_SERVICE_UUID BLEUUID("180A")
#define SER_CHAR_UUID BLEUUID("2A25")
#define MAN_CHAR_UUID BLEUUID("2A29")
#define MOD_CHAR_UUID BLEUUID("2A24")
#define FWR_CHAR_UUID BLEUUID("2A26")
#define HWR_CHAR_UUID BLEUUID("2A27")
#define SWR_CHAR_UUID BLEUUID("2A28")
#define SWC_CHAR_UUID BLEUUID("2A28")
#define BAT_SERVICE_UUID BLEUUID("180F")
#define BAT_CHAR_UUID BLEUUID("2A19")
@@ -83,7 +69,7 @@ struct notification {
};
BLECharacteristic *pCharacteristic_cover, *pCharacteristic_fw, *pCharacteristic_unknown, *pCharacteristic_bat;
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser, *pCharacteristic_man, *pCharacteristic_mod, *pCharacteristic_fwr, *pCharacteristic_hwr;
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser;
BLEServer *pServer = NULL;
bool deviceConnected = false;
bool oldDeviceConnected = false;
@@ -172,12 +158,12 @@ void decode(BLECharacteristic *pChar) {
memcpy((void *)&msg, data_dec, 4);
Serial.printf("\t message: SRV: %02x, CMD %02x, SEQ %i, LEN %i\n", msg.serviceID, msg.cmdID, msg.sequence, msg.data_len);
// special responses (static data!)
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, (uint8_t)(SW_VERSION & 0xFF), (uint8_t)(SW_VERSION >> 8), 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, TYP_ID, MODEL_ID, 0x08 }; // HW diagnostics
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
// sepecial responses (static data!)
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x2a, 0xe0, 0x08 }; // HW diagnostics
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
Serial.print("\t\t");
switch ((msg.serviceID << 8) | msg.cmdID) {
@@ -193,7 +179,7 @@ void decode(BLECharacteristic *pChar) {
break;
case 0xF711:
// identify
Serial.printf("identify: %i times\n", data_dec[4]);
Serial.printf("identify: %i\n", data_dec[4]);
resp_size = set_response(&response, (const message *)data_dec);
break;
case 0xF7B8:
@@ -222,8 +208,8 @@ void decode(BLECharacteristic *pChar) {
case 0xFB02:
// set shade key
Serial.print("set shade key: ");
print_hex(&data_raw[4], data_len - 4, "\\x", "");
// set response before key, to acknowledge unencrypted
print_hex(&data_raw[4], data_len - 4, "\\x");
// set resonse before key, to acknowledge unencrypted
resp_size = set_response(&response, (const message *)data_dec);
if (msg.data_len == 16) {
memcpy(home_key, &data_raw[4], 16);
@@ -265,9 +251,7 @@ void decode(BLECharacteristic *pChar) {
resp_size = set_response(&response, (const message *)data_dec);
break;
default:
Serial.println(F("*********************************** unknown message (try ACK)"));
resp_size = set_response(&response, (const message *)data_dec);
break;
Serial.println(F("*********************************** unknown message"));
}
if (resp_size) {
pChar->setValue((uint8_t *)&response, resp_size);
@@ -307,7 +291,7 @@ class coverCallbacks : public BLECharacteristicCallbacks {
void onNotify(BLECharacteristic *pCharacteristic) {
Serial.printf("Cover onNotify() %s\n", pCharacteristic->toString().c_str());
}
void onStatus(BLECharacteristic *pCharacteristic, Status s, uint32_t code) {
Serial.printf("Cover onStatus() %s: %s\n", BLEstate[s], pCharacteristic->toString().c_str());
}
@@ -357,7 +341,6 @@ class genericCallbacks : public BLECharacteristicCallbacks {
void setup() {
Serial.begin(115200);
delay(1000); // wait for terminal to be ready
Serial.println(NAME " initializing ...");
BLEDevice::init(NAME);
@@ -390,7 +373,8 @@ pDesc1->setValue("cover");*/
BAT_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_bat->setCallbacks(new batteryCallbacks());
pCharacteristic_bat->setValue((uint8_t *)&BATTERY_LEVEL, 1);
uint8_t battery_level = 42;
pCharacteristic_bat->setValue(&battery_level, 1);
pCharacteristic_bat->addDescriptor(new BLE2902());
BLEService *pFWService = pServer->createService(FW_SERVICE_UUID);
@@ -406,9 +390,9 @@ pDesc1->setValue("cover");*/
BLEService *pDEVService = pServer->createService(DEV_SERVICE_UUID);
pCharacteristic_dev = pDEVService->createCharacteristic(
SWR_CHAR_UUID,
SWC_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_dev->setValue(String(SW_VERSION));
pCharacteristic_dev->setValue(FW_VERSION);
pCharacteristic_dev->setCallbacks(new genericCallbacks());
pCharacteristic_ser = pDEVService->createCharacteristic(
SER_CHAR_UUID,
@@ -416,28 +400,6 @@ pDesc1->setValue("cover");*/
pCharacteristic_ser->setValue(SERIAL_NR);
pCharacteristic_ser->setCallbacks(new genericCallbacks());
pCharacteristic_man = pDEVService->createCharacteristic(
MAN_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_man->setValue("Hunter Douglas");
pCharacteristic_man->setCallbacks(new genericCallbacks());
pCharacteristic_mod = pDEVService->createCharacteristic(
MOD_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_mod->setValue(String(TYP_ID));
pCharacteristic_mod->setCallbacks(new genericCallbacks());
pCharacteristic_fwr = pDEVService->createCharacteristic(
FWR_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_fwr->setValue(String(FW_REVISION));
pCharacteristic_fwr->setCallbacks(new genericCallbacks());
pCharacteristic_hwr = pDEVService->createCharacteristic(
HWR_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_hwr->setValue(String(HW_REVISION));
pCharacteristic_hwr->setCallbacks(new genericCallbacks());
// Start the services
pCovService->start();
pFWService->start();
@@ -446,10 +408,9 @@ pDesc1->setValue("cover");*/
// Start advertising
BLEAdvertisementData AdvertisementData;
const char adv[] = { 0x19, 0x08, 0x00, 0x00, TYP_ID, 0x00, 0x00, 0x00, 0x00, 0x00, 0xA2 };
// Hunter Douglas ^^ -- ^^ ^^key ^^ ^--pos1--^
AdvertisementData.setManufacturerData(String(adv, 11));
const String manufacturerData = String("\x19\x08\x00\x00\x2A\x00\x00\x00\x00\x00\xA2", 11);
// Hunter Douglas ^^--^^ ^^key^^ ^^ ID-Type
AdvertisementData.setManufacturerData(manufacturerData);
AdvertisementData.setPartialServices(BLEUUID(COVER_SERVICE_UUID));
AdvertisementData.setFlags((1 << 2) | (1 << 1)); // [BR/EDR Not Supported] | [LE General Discoverable Mode]

Binary file not shown.

Before

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 53 KiB

View File

@@ -1,17 +1,7 @@
# pyproject.toml
[project]
name = "hunterdouglas_powerview_ble"
classifiers = [
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.13"
]
readme = "README.md"
requires-python = ">=3.13.2"
[project.urls]
"Source Code" = "https://github.com/patman15/hdpv_ble/"
"Bug Reports" = "https://github.com/patman15/hdpv_ble/issues"
requires-python = ">=3.12.0"
[tool.pytest.ini_options]
minversion = "8.0"
@@ -24,9 +14,9 @@ testpaths = [
]
asyncio_mode = "auto"
# ruff settings from HA 2025.2.2
# ruff configuration taken from HA 2024.11.2 (less ignores)
[tool.ruff]
required-version = ">=0.9.1"
required-version = ">=0.6.8"
[tool.ruff.lint]
select = [
@@ -45,10 +35,8 @@ select = [
"B017", # pytest.raises(BaseException) should be considered evil
"B018", # Found useless attribute access. Either assign it to a variable or remove it.
"B023", # Function definition does not bind loop variable {name}
"B024", # `{name}` is an abstract base class, but it has no abstract methods or properties
"B026", # Star-arg unpacking after a keyword argument is strongly discouraged
"B032", # Possible unintentional type annotation (using :). Did you mean to assign (using =)?
"B035", # Dictionary comprehension uses static key
"B904", # Use raise from to specify exception cause
"B905", # zip() without an explicit strict= parameter
"BLE",
@@ -82,27 +70,12 @@ select = [
"RSE", # flake8-raise
"RUF005", # Consider iterable unpacking instead of concatenation
"RUF006", # Store a reference to the return value of asyncio.create_task
"RUF007", # Prefer itertools.pairwise() over zip() when iterating over successive pairs
"RUF008", # Do not use mutable default values for dataclass attributes
"RUF010", # Use explicit conversion flag
"RUF013", # PEP 484 prohibits implicit Optional
"RUF016", # Slice in indexed access to type {value_type} uses type {index_type} instead of an integer
"RUF017", # Avoid quadratic list summation
"RUF018", # Avoid assignment expressions in assert statements
"RUF019", # Unnecessary key check before dictionary access
"RUF020", # {never_like} | T is equivalent to T
"RUF021", # Parenthesize a and b expressions when chaining and and or together, to make the precedence clear
"RUF022", # Sort __all__
"RUF023", # Sort __slots__
"RUF024", # Do not pass mutable objects as values to dict.fromkeys
"RUF026", # default_factory is a positional-only argument to defaultdict
"RUF030", # print() call in assert statement is likely unintentional
"RUF032", # Decimal() called with float literal argument
"RUF033", # __post_init__ method with argument defaults
"RUF034", # Useless if-else condition
"RUF100", # Unused `noqa` directive
"RUF101", # noqa directives that use redirected rule codes
"RUF200", # Failed to parse pyproject.toml: {message}
# "RUF100", # Unused `noqa` directive; temporarily every now and then to clean them up
"S102", # Use of exec detected
"S103", # bad-file-permissions
"S108", # hardcoded-temp-file
@@ -115,7 +88,7 @@ select = [
"S317", # suspicious-xml-sax-usage
"S318", # suspicious-xml-mini-dom-usage
"S319", # suspicious-xml-pull-dom-usage
# "S320", # suspicious-xmle-tree-usage
"S320", # suspicious-xmle-tree-usage
"S601", # paramiko-call
"S602", # subprocess-popen-with-shell-equals-true
"S604", # call-with-shell-equals-true
@@ -126,7 +99,7 @@ select = [
"SLOT", # flake8-slots
"T100", # Trace found: {name} used
"T20", # flake8-print
"TC", # flake8-type-checking
"TCH", # flake8-type-checking
"TID", # Tidy imports
"TRY", # tryceratops
"UP", # pyupgrade
@@ -146,12 +119,13 @@ ignore = [
# "PLC1901", # {existing} can be simplified to {replacement} as an empty string is falsey; too many false positives
# "PLR0911", # Too many return statements ({returns} > {max_returns})
# "PLR0912", # Too many branches ({branches} > {max_branches})
"PLR0913", # Too many arguments to function call ({c_args} > {max_args})
# "PLR0913", # Too many arguments to function call ({c_args} > {max_args})
# "PLR0915", # Too many statements ({statements} > {max_statements})
"PLR2004", # Magic value used in comparison, consider replacing {value} with a constant variable
# "PLW2901", # Outer {outer_kind} variable {name} overwritten by inner {inner_kind} target
# "PT004", # Fixture {fixture} does not return anything, add leading underscore
# "PT011", # pytest.raises({exception}) is too broad, set the `match` parameter or use a more specific exception
"PT018", # Assertion should be broken down into multiple parts
# "PT018", # Assertion should be broken down into multiple parts
# "RUF001", # String contains ambiguous unicode character.
# "RUF002", # Docstring contains ambiguous unicode character.
# "RUF003", # Comment contains ambiguous unicode character.
@@ -162,12 +136,14 @@ ignore = [
# "SIM115", # Use context handler for opening files
# Moving imports into type-checking blocks can mess with pytest.patch()
"TC001", # Move application import {} into a type-checking block
"TC002", # Move third-party import {} into a type-checking block
"TC003", # Move standard library import {} into a type-checking block
"TCH001", # Move application import {} into a type-checking block
"TCH002", # Move third-party import {} into a type-checking block
"TCH003", # Move standard library import {} into a type-checking block
"TRY003", # Avoid specifying long messages outside the exception class
"TRY400", # Use `logging.exception` instead of `logging.error`
# Ignored due to performance: https://github.com/charliermarsh/ruff/issues/2923
"UP038", # Use `X | Y` in `isinstance` call instead of `(X, Y)`
# May conflict with the formatter, https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
"W191",
@@ -185,14 +161,3 @@ ignore = [
"PLE0605"
]
[tool.ruff.lint.isort]
force-sort-within-sections = true
known-first-party = [
"homeassistant",
]
combine-as-imports = true
split-on-trailing-comma = false
[tool.ruff.lint.per-file-ignores]
"scripts/*" = ["T201"]

View File

@@ -1,6 +1,4 @@
homeassistant==2025.11.0
homeassistant==2024.11.0
pip>=21.3.1
ruff>=0.9.1,<=0.15.0
types-requests
mypy~=1.19.1
codespell
ruff==0.6.8

View File

@@ -2,9 +2,9 @@
wheel
home-assistant-bluetooth
habluetooth>=5.3.0
habluetooth>=3.6.0
bluetooth-adapters
pytest>=8.4.1
pytest>=8.3.3
pytest-cov>=5.0.0
pytest-socket>=0.7.0
pytest-asyncio>=0.24.0
@@ -16,10 +16,10 @@ aiohttp
aiohttp_cors
aiohttp-fast-url-dispatcher
aiohttp-zlib-ng
bleak>=1.0.1
bleak-retry-connector>=4.4.3
bleak>=0.22.3
bleak-retry-connector>=3.6.0
bluetooth-data-tools
pyserial-asyncio
pyudev
pytest-homeassistant-custom-component==0.13.294
pytest-homeassistant-custom-component==0.13.181

View File

@@ -1 +0,0 @@
"""Script to extract PowerView homekey from a G3 PowerView Gateway."""

View File

@@ -1,119 +0,0 @@
"""Extract PowerView homekey from a G3 PowerView Gateway."""
import base64
import json
import struct
from typing import Any, Final
import requests
HUB: Final[str] = "http://powerview-g3.local"
TIMEOUT: Final[int] = 10
def create_request(sid: int, cid: int, sequence_id: int, data: bytes) -> bytes:
"""Assemble a request frame for the PowerView protocol."""
return struct.pack("<BBBB", sid, cid, sequence_id, len(data)) + data
def decode_response(packet: bytes) -> dict[str, Any]:
"""Decode a response frame from the PowerView protocol."""
if len(packet) < 4:
raise ValueError("Packet size too small")
sid, cid, sequence_id, length = struct.unpack("<BBBB", packet[0:4])
if len(packet) != 4 + length:
raise ValueError("Not all data present")
if length < 1:
raise ValueError("No errorCode present")
(error_code,) = struct.unpack("<B", packet[4:5])
data: Final[bytes] = packet[5:]
return {
"cid": cid,
"sid": sid,
"sequenceId": sequence_id,
"errorCode": error_code,
"data": data,
}
def create_get_shade_key_request(sequence_id) -> bytes:
"""Create a GetShadeKey request frame."""
return create_request(251, 18, sequence_id, b"")
def get_shade_key(hub: str, ble_name) -> bytes:
"""Get the homekey for a shade."""
try:
shades_exec_resp: requests.Response = requests.post(
hub + "/home/shades/exec?shades=" + ble_name,
json={"hex": create_get_shade_key_request(1).hex()},
timeout=TIMEOUT,
)
shades_exec_resp.raise_for_status()
except requests.exceptions.RequestException as ex:
print(f"Unable to send GetShadeKey {ex!s}")
raise
result: dict = json.loads(shades_exec_resp.content)
responses = result.get("responses", [])
if len(responses) != 1 or "hex" not in responses[0]:
raise OSError(f"Error when attempting GetShadeKey: {result}")
response: Final[bytes] = bytes.fromhex(responses[0]["hex"])
dec_resp: Final[dict[str, Any]] = decode_response(response)
if dec_resp["errorCode"] != 0:
raise ValueError(
f"BLE errorCode={dec_resp['errorCode']} data={dec_resp['data'].hex()}"
)
if len(dec_resp["data"]) != 16:
raise ValueError("Expected 16 byte homekey")
return dec_resp["data"]
def main(hub: str) -> int:
"""Extract the homekeys from all shades."""
try:
shades_resp: requests.Response = requests.get(
hub + "/home/shades", timeout=TIMEOUT
)
shades_resp.raise_for_status()
except requests.exceptions.RequestException as ex:
print(f"Unable to get list of shades:\n\t{ex!s}")
return -1
shades = json.loads(shades_resp.content)
print(f"Found {len(shades)} shades, interrogating")
network_key: bytes | None = None
for shade in shades:
name: str = base64.b64decode(shade["name"]).decode("utf-8")
try:
key: bytes = get_shade_key(hub, shade["bleName"])
network_key = key
except (OSError, ValueError) as ex:
if network_key is not None:
key = network_key
print(f"Shade '{name}':")
print(f"\tBLE name: '{shade['bleName']}'")
print(f"\tHomeKey: {key.hex()} (shade unreachable, using network key)")
else:
print(f"Shade '{name}':")
print(f"\tBLE name: '{shade['bleName']}'")
print(f"\tHomeKey: ERROR - {ex}")
continue
print(f"Shade '{name}':")
print(f"\tBLE name: '{shade['bleName']}'")
print(f"\tHomeKey: {key.hex()}")
return 0
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Extract PowerView homekey from a G3 PowerView Gateway"
)
parser.add_argument("hub", nargs="?", help="URL to HUB", default=HUB)
args = parser.parse_args()
sys.exit(main(**vars(args)))