39 Commits

Author SHA1 Message Date
Richard Mann
acb2c5ff52 fix: linting 2026-04-09 08:20:01 +10:00
Richard Mann
7a2bf2193a feat: add shade capabilities, velocity control, and cleanup
Add ShadeCapability lookup to replace hardcoded model string checks for
tilt/tilt-only selection. Add velocity number entity (0–100) and pass
velocity through all movement paths including open/close. Remove
redundant device_info property overrides and deduplicate hex parsing.
2026-04-09 08:15:10 +10:00
Richard Mann
f260416676 fix: linting and hass validation issues 2026-04-09 07:18:54 +10:00
Richard Mann
abb0a3e8a3 fix: allow option to setup found devices or add a new device plus updated descriptions 2026-04-07 08:13:13 +10:00
Richard Mann
652337e32c fix: linting and formatting 2026-04-06 15:13:21 +10:00
Richard Mann
af08d18d62 fix: area assignment for multiple devices 2026-04-06 15:01:52 +10:00
Richard Mann
894580c20b fix: device and entity naming 2026-04-06 12:17:19 +10:00
Richard Mann
317b450702 fix: handle stale ble devices 2026-04-06 11:51:01 +10:00
Richard Mann
31185a4446 Improve config flow UX for multi-shade setups
Reuse the home key from already-configured shades so adding subsequent
shades skips the key step. Show human-readable shade names from the hub
in the device picker. Allow selecting multiple shades at once instead of
repeating the flow for each one. Default to hub fetch as the key method.
2026-04-06 09:07:16 +10:00
Patrick
04c7036351 Revise warning about HOME_KEY loss in README
Update warning message in README to reflect changes.
2026-01-01 13:10:48 +01:00
Patrick
d66cf61887 fix position factor 2025-12-30 12:14:58 +01:00
Dustin Brewer
5d498b8753 Initial support for Parkland/TypeID 39 shades (#12)
* disabled home_id filter

* stronger typing

* Update .gitignore

* extend set_position() for all values

* support setting tilt

* completed tilt control

* add tilt functions

* Initial support for Parkland/TypeID 39 shades

* Update cover.py

* Update README.md

* simplify set_position()

* fix spelling issues

* Update .gitignore

* remove unverified shade types

* clean code

* fix ruff

* update linting

* Update lint.yml

---------

Co-authored-by: patman15 <14628713+patman15@users.noreply.github.com>
2025-12-30 10:00:39 +01:00
Patrick
b558083b50 Added known issues 2025-12-30 08:46:09 +01:00
Patrick
3775496936 Clean-up code and upgrade dependencies (#20)
* Update pyproject.toml

* stronger typing

* fix type annotations

* update dependencies

* fix spelling

* add missing info to pyproject.toml

* code cleanup

* add project URLs

* fix mypy issues

* update HA to 2025.11

* upgrade Python to 3.13.2 to match HA

* Update lint.yml
2025-12-29 19:36:14 +01:00
Patrick
883aca753e remove trailing spaces 2025-08-09 11:47:35 +02:00
patman15
fe3646df27 fix #15, missing service discovery 2025-08-09 10:57:32 +02:00
patman15
bae4158a3c Update PV_BLE_cover.ino
add default ACK for unknown messages
2025-06-02 18:15:57 +02:00
patman15
d9ebc54026 Merge branch 'main' of https://github.com/patman15/hdpv_ble 2025-06-02 17:57:32 +02:00
Patrick
62bbfd7361 Update README.md 2025-01-16 08:10:12 +01:00
Patrick
30dff09bb1 Merge pull request #2 from Frans-Willem/main
Helper script to extract homekey(s) from PowerView gateway gen3
2025-01-16 08:07:49 +01:00
patman15
74d906151e add constants and more characteristics 2025-01-06 19:21:27 +01:00
patman15
9773e5df65 add icons 2025-01-06 19:11:53 +01:00
Patrick
b2d5335e1d Rename __init.py__ to __init__.py 2025-01-02 10:44:03 +01:00
patman15
a6aaf4d727 Create __init.py__ 2025-01-02 10:41:29 +01:00
patman15
f2ad61a016 Update extract_gateway3_homekey.py 2025-01-02 10:37:34 +01:00
Patrick
f6ec17c9b2 Merge branch 'main' into main 2025-01-02 10:34:01 +01:00
patman15
d79096357d modified ignore pattern 2025-01-02 10:32:30 +01:00
patman15
52f7390fc0 fixed ruff 2025-01-02 10:30:23 +01:00
patman15
c9a27388af Update extract_gateway3_homekey.py 2025-01-02 10:21:36 +01:00
patman15
1590647c9e Update README.md 2024-12-30 18:22:30 +01:00
patman15
1559776aa8 document key extraction 2024-12-30 18:10:51 +01:00
Patrick
2c03881b60 Update README.md 2024-12-30 17:26:33 +01:00
Patrick
0bc5644883 Merge pull request #6 from patman15/feature/identify
Support identification
2024-12-30 17:25:28 +01:00
patman15
e97eef94f8 update emulator 2024-12-30 17:24:22 +01:00
patman15
4b51ea514f fix ruff 2024-12-30 17:24:14 +01:00
patman15
591815652d support identification 2024-12-30 17:20:11 +01:00
patman15
ba487d907d debug enhancement 2024-12-30 13:39:33 +01:00
Patrick
8b0bccee6b Update README.md 2024-12-30 11:45:10 +01:00
Frans-Willem Hardijzer
054f35f838 Helper script to extract homekey(s) from PowerView Gateway Gen3 2024-10-14 14:45:14 +02:00
27 changed files with 1345 additions and 210 deletions

View File

@@ -8,8 +8,8 @@ on:
- cron: '0 5 * * 6'
jobs:
ruff:
name: "Ruff"
lint:
name: "Lint the code"
runs-on: "ubuntu-latest"
steps:
- name: "Checkout the repository"
@@ -18,11 +18,17 @@ jobs:
- name: "Set up Python"
uses: actions/setup-python@main
with:
python-version: "3.12"
python-version: "3.13.2"
cache: "pip"
- name: "Install requirements"
run: python3 -m pip install -r requirements.txt
- name: "Run Ruff"
run: python3 -m ruff check .
- name: "Run ruff"
run: ruff check .
- name: "Run mypy"
run: mypy .
- name: "Run codespell"
run: codespell -L hass

View File

@@ -8,8 +8,7 @@
> [!WARNING]
> - This integration is under development!
> - Test coverage is low, malfunction might occur.
> - Only devices that are **not** added to the app are controlable. It is possible to add them to the app if you just want to monitor the status (position, battery) in Home Assistant.
> - Currently only position change is supported (e.g., no tilt)
> - The HOME_KEY is lost over updates!
## Features
- Zero configuration
@@ -18,7 +17,7 @@
### Supported Devices
Type* | Description
-- | --
-- | --
1 | Designer Roller
4 | Roman
5 | Bottom Up
@@ -26,6 +25,7 @@ Type* | Description
10 | Duette and Applause SkyLift
19 | Provenance Woven Wood
31, 32, 84 | Vignette
39 | Parkland
42 | M25T Roller Blind
49 | AC Roller
52 | Banded Shades
@@ -39,10 +39,14 @@ The integration provides the following information about the battery
Platform | Description | Unit | Details
-- | -- | -- | --
`binary_sensor` | battery charging indicator | `bool` | true if battery is charging
`button` | identify shade | - | identify shade by LED and 3 beeps
`cover` | view/control position | `%` | percentage cover is open (100% is open)
`sensor` | SoC (state of charge) | `%` | range 100% (full), 50%, 20%, 0% (battery empty)
## Installation
> [!IMPORTANT]
> In case you added your shades to the app or a gateway, you need to [set the encryption key](#set-the-encryption-key) manually in the [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py) file after **each** update!
### Automatic
Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom repository](https://hacs.xyz/docs/faq/custom_repositories/).
@@ -57,12 +61,22 @@ Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom rep
1. Restart Home Assistant
1. In the HA UI go to "Configuration" -> "Integrations" click "+" and search for "Hunter Douglas PowerView (BLE)"
## Set the Encryption Key
Currently, there are three methods to obtain the key:
## Outlook
- Add support for encryption
- Allow parallel usage to PowerView app as "remote"
- Add support for tilt function
- Add support for further device types
1. Via adopting a BLE shade: There is a [shade emulator](/emu/PV_BLE_cover) that works with Arduino IDE and an ESP32 device (&ge; 2MiB flash, &ge; 128KiB required), e.g. [Adafruit QT Py ESP32-S3](https://www.adafruit.com/product/5426). Install and connect via serial port, then go to the PowerView app and add the shade `myPVcover` to your home. You will see a log message `set shade key: \xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx` . Copy this key. You can delete the shade from the app when done.
2. Extracting from gateway: This [script](scripts/extract_gateway3_homekey.py) is able to extract the key from a working PowerView gateway.
3. Grabbing from the app: Checkout this [post in the Home Assistant community forum](https://community.home-assistant.io/t/hunter-douglas-powerview-gen-3-integration/424836/228).
Finally, you need to manually copy the key to [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py).
> [!IMPORTANT]
> You need to update the file after **each** update!
## Known Issues
<details><summary>Shade inoperable after charging</summary>
It seems that the shades require some re-initialization after charging. The solution is currently unknown, but as a workaround you can operate the shade ones using the vendor app.
</details>
## Troubleshooting
In case you have severe troubles,
@@ -72,6 +86,15 @@ In case you have severe troubles,
- disable the log (Home Assistant will prompt you to download the log), and finally
- [open an issue](https://github.com/patman15/hdpv_ble/issues/new?assignees=&labels=Bug&projects=&template=bug.yml) with a good description of what happened and attach the log.
# Thanks To
[@mannkind](https://github.com/mannkind)
[license-shield]: https://img.shields.io/github/license/patman15/hdpv_ble.svg?style=for-the-badge
[releases-shield]: https://img.shields.io/github/release/patman15/hdpv_ble.svg?style=for-the-badge
[releases]: https://github.com//patman15/hdpv_ble/releases
## Outlook
- Add tests!
- Allow parallel usage to PowerView app as "remote"
- Add support for tilt function
- Add support for further device types

View File

@@ -4,17 +4,25 @@
@license: Apache-2.0 license
"""
from bleak.backends.device import BLEDevice
from bleak.exc import BleakError
from homeassistant.components.bluetooth import async_ble_device_from_address
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
from .const import DOMAIN, LOGGER
from .const import LOGGER
from .coordinator import PVCoordinator
PLATFORMS: list[Platform] = [Platform.BINARY_SENSOR, Platform.COVER, Platform.SENSOR]
PLATFORMS: list[Platform] = [
Platform.BINARY_SENSOR,
Platform.BUTTON,
Platform.COVER,
Platform.NUMBER,
Platform.SENSOR,
]
type ConfigEntryType = ConfigEntry[PVCoordinator]
@@ -26,7 +34,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntryType) -> bool
if entry.unique_id is None:
raise ConfigEntryError("Missing unique ID for device.")
ble_device = async_ble_device_from_address(
ble_device: BLEDevice | None = async_ble_device_from_address(
hass=hass, address=entry.unique_id, connectable=True
)
@@ -35,14 +43,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntryType) -> bool
f"Could not find PowerView device ({entry.unique_id}) via Bluetooth"
)
coordinator = PVCoordinator(hass, ble_device, entry.data.copy())
coordinator = PVCoordinator(hass, ble_device, entry.data.copy(), entry.title)
try:
await coordinator.query_dev_info()
except BleakError as err:
raise ConfigEntryNotReady("Unable to query device info.") from err
# Insert the coordinator in the global registry
hass.data.setdefault(DOMAIN, {})
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(coordinator.async_start())

View File

@@ -1,10 +1,10 @@
"""Hunter Douglas PowerView BLE API."""
import asyncio
import time
from dataclasses import dataclass
from enum import Enum
from typing import Final
import time
from typing import Final, NamedTuple
from bleak import BleakClient
from bleak.backends.device import BLEDevice
@@ -12,7 +12,15 @@ from bleak.exc import BleakError
from bleak.uuids import normalize_uuid_str
from bleak_retry_connector import establish_connection
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from homeassistant.components.cover import ATTR_CURRENT_POSITION
from cryptography.hazmat.primitives.ciphers.base import (
AEADDecryptionContext,
AEADEncryptionContext,
)
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
ATTR_CURRENT_TILT_POSITION,
)
from .const import LOGGER, TIMEOUT
@@ -25,6 +33,7 @@ ATTR_ACTIVITY: Final[str] = "activity"
SHADE_TYPE: Final[dict[int, str]] = {
# up down only
1: "Designer Roller",
4: "Roman",
5: "Bottom Up",
@@ -38,8 +47,42 @@ SHADE_TYPE: Final[dict[int, str]] = {
52: "Banded Shades",
53: "Sonnette",
84: "Vignette",
# top down bottom up
8: "Duette, Top Down Bottom Up",
9: "Duette DuoLite, Top Down Bottom Up",
33: "Duette Architella, Top Down Bottom Up",
39: "Parkland",
47: "Pleated, Top Down Bottom Up",
# top down, tilt anywhere
51: "Venetian, Tilt Anywhere",
62: "Venetian, Tilt Anywhere",
}
class ShadeCapability(NamedTuple):
"""Capability flags for a shade type."""
has_tilt: bool = False
tilt_only: bool = False
SHADE_CAPABILITIES: Final[dict[int, ShadeCapability]] = {
# tilt anywhere (position + tilt)
51: ShadeCapability(has_tilt=True),
62: ShadeCapability(has_tilt=True),
# tilt only (no position movement)
39: ShadeCapability(has_tilt=True, tilt_only=True),
}
_DEFAULT_CAPABILITY: Final[ShadeCapability] = ShadeCapability()
def get_shade_capabilities(type_id: int | None) -> ShadeCapability:
"""Return shade capabilities for a given type_id."""
if type_id is None:
return _DEFAULT_CAPABILITY
return SHADE_CAPABILITIES.get(type_id, _DEFAULT_CAPABILITY)
OPEN_POSITION: Final[int] = 100
CLOSED_POSITION: Final[int] = 0
@@ -58,6 +101,7 @@ class ShadeCmd(Enum):
SET_POSITION = 0x01F7
STOP = 0xB8F7
ACTIVATE_SCENE = 0xBAF7
IDENTIFY = 0x11F7
@dataclass
@@ -78,26 +122,18 @@ class PowerViewBLE:
def __init__(self, ble_device: BLEDevice, home_key: bytes = b"") -> None:
"""Initialize device API via Bluetooth."""
self._ble_device: Final[BLEDevice] = ble_device
self._ble_device: BLEDevice = ble_device
self.name: Final[str] = self._ble_device.name or "unknown"
self._seqcnt: int = 1
self._client: BleakClient = BleakClient(
self._ble_device,
disconnected_callback=self._on_disconnect,
services=[
UUID_COV_SERVICE,
# self.UUID_DEV_SERVICE,
# self.UUID_BAT_SERVICE,
],
)
self._client: BleakClient = BleakClient(self._ble_device)
self._data_event = asyncio.Event()
self._data: bytearray
self._data: bytes = b""
self._info: PVDeviceInfo = PVDeviceInfo()
self._cmd_lock: Final = asyncio.Lock()
self._cmd_next = None
self._is_encrypted: bool = False
self._cipher: Final = (
Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16)))
self._cmd_lock: Final = asyncio.Lock()
self._cmd_next: tuple[ShadeCmd, bytes]
self._cipher: Final[Cipher | None] = (
Cipher(algorithms.AES(home_key), modes.CTR(bytes(16)))
if len(home_key) == 16
else None
)
@@ -106,15 +142,24 @@ class PowerViewBLE:
await self._data_event.wait()
self._data_event.clear()
def set_ble_device(self, ble_device: BLEDevice) -> None:
"""Update the BLE device reference (e.g. when proxy details change)."""
self._ble_device = ble_device
@property
def encrypted(self) -> bool:
"""Return whether communication with this shade is encrypted."""
return self._is_encrypted
@encrypted.setter
def encrypted(self, value:bool) -> None:
def encrypted(self, value: bool) -> None:
self._is_encrypted = value
@property
def has_key(self) -> bool:
"""Return True if a valid homekey was provided."""
return self._cipher is not None
@property
def info(self) -> PVDeviceInfo:
"""Return device information, e.g. SW version."""
@@ -126,9 +171,7 @@ class PowerViewBLE:
return self._client.is_connected
# general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len
async def _cmd(
self, cmd: tuple[ShadeCmd, bytearray], disconnect: bool = True
) -> None:
async def _cmd(self, cmd: tuple[ShadeCmd, bytes], disconnect: bool = True) -> None:
self._cmd_next = cmd
if self._cmd_lock.locked():
LOGGER.debug("%s: device busy, queuing %s command", self.name, cmd[0])
@@ -137,19 +180,18 @@ class PowerViewBLE:
async with self._cmd_lock:
try:
await self._connect()
cmd_run = self._cmd_next
tx_data = (
bytearray(
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
+ bytes([self._seqcnt, len(cmd_run[1])])
)
cmd_run: tuple[ShadeCmd, bytes] = self._cmd_next
tx_data: bytes = bytes(
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
+ bytes([self._seqcnt, len(cmd_run[1])])
+ cmd_run[1]
)
LOGGER.debug("sending cmd: %s", tx_data.hex(" "))
if self._cipher is not None and self._is_encrypted:
enc = self._cipher.encryptor()
enc: AEADEncryptionContext = self._cipher.encryptor()
tx_data = enc.update(tx_data) + enc.finalize()
LOGGER.debug(" encrypted: %s", tx_data.hex(" "))
self._data_event.clear()
LOGGER.debug("sending cmd: %s", tx_data)
await self._client.write_gatt_char(UUID_TX, tx_data, False)
self._seqcnt += 1
LOGGER.debug("waiting for response")
@@ -171,15 +213,15 @@ class PowerViewBLE:
if len(data) != 9:
LOGGER.debug("not a V2 record!")
return []
pos = int.from_bytes(data[3:5], byteorder="little")
pos2 = (int(data[5]) << 4) + (int(data[4]) >> 4)
pos: Final[int] = int.from_bytes(data[3:5], byteorder="little")
pos2: Final[int] = (int(data[5]) << 4) + (int(data[4]) >> 4)
return [
(ATTR_CURRENT_POSITION, ((pos >> 2) / 10)),
("position2", pos2 >> 2),
("position3", int(data[6])),
("tilt", int(data[7])),
(ATTR_CURRENT_TILT_POSITION, int(data[7])),
("home_id", int.from_bytes(data[0:2], byteorder="little")),
("type_id", int.from_bytes(data[2:3])),
("type_id", int(data[2])),
("is_opening", bool(pos & 0x3 == 0x2)),
("is_closing", bool(pos & 0x3 == 0x1)),
("battery_charging", bool(pos & 0x3 == 0x3)), # observed
@@ -189,34 +231,51 @@ class PowerViewBLE:
]
# position cmd: uint16_t pos1, uint16_t pos2, uint16_t pos3, uint16_t tilt, uint8_t velocity
async def set_position(self, value: int, disconnect: bool = True) -> None:
async def set_position(
self,
pos1: int,
pos2: int = 0x8000,
pos3: int = 0x8000,
tilt: int = 0x8000,
velocity: int = 0x0,
disconnect: bool = True,
) -> None:
"""Set position of device."""
LOGGER.debug("%s setting position to %i", self.name, value)
LOGGER.debug(
"%s setting position to %i/%i/%i, tilt %i, velocity %s",
self.name,
pos1,
pos2,
pos3,
tilt,
velocity,
)
await self._cmd(
(
ShadeCmd.SET_POSITION,
bytearray(
int.to_bytes(value * 100, 2, byteorder="little")
+ bytes([0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x0])
),
int.to_bytes(pos1 * 100, 2, byteorder="little")
+ int.to_bytes(pos2, 2, byteorder="little")
+ int.to_bytes(pos3, 2, byteorder="little")
+ int.to_bytes(tilt, 2, byteorder="little")
+ int.to_bytes(velocity, 1),
),
disconnect,
)
async def open(self) -> None:
async def open(self, velocity: int = 0x0) -> None:
"""Fully open cover."""
LOGGER.debug("%s open", self.name)
await self.set_position(OPEN_POSITION, disconnect=False)
await self.set_position(OPEN_POSITION, velocity=velocity, disconnect=False)
async def stop(self) -> None:
"""Stop device movement."""
LOGGER.debug("%s stop", self.name)
await self._cmd((ShadeCmd.STOP, bytearray(b"")))
await self._cmd((ShadeCmd.STOP, b""))
async def close(self) -> None:
async def close(self, velocity: int = 0x0) -> None:
"""Fully close cover."""
LOGGER.debug("%s close", self.name)
await self.set_position(CLOSED_POSITION, disconnect=False)
await self.set_position(CLOSED_POSITION, velocity=velocity, disconnect=False)
# uint8_t scene#, uint8_t unknown
# open: scene 2
@@ -227,18 +286,19 @@ class PowerViewBLE:
await self._cmd(
(
ShadeCmd.ACTIVATE_SCENE,
bytearray(int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2])),
int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2]),
),
)
def _verify_response(self, din: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
async def identify(self, beeps: int = 0x3) -> None:
"""Identify device."""
LOGGER.debug("%s identify (%i)", self.name, beeps)
await self._cmd((ShadeCmd.IDENTIFY, bytes([min(beeps, 0xFF)])))
def _verify_response(self, data: bytes, seq_nr: int, cmd: ShadeCmd) -> bool:
"""Verify shade response data."""
data: bytearray = din
if self._cipher is not None and self._is_encrypted:
dec = self._cipher.decryptor()
data = bytearray(dec.update(din) + dec.finalize())
if len(data) < 4:
LOGGER.error("Reponse message too short")
LOGGER.error("Response message too short")
return False
if int.from_bytes(data[0:2], byteorder="little") != cmd.value & 0xFFEF:
LOGGER.warning("Response to wrong command")
@@ -252,7 +312,7 @@ class PowerViewBLE:
LOGGER.error("Wrong response data length")
return False
if int(data[4] != 0):
LOGGER.error("Command %d returned error #%d", cmd.value, int(data[4]))
LOGGER.error("Command %X returned error #%d", cmd.value, int(data[4]))
return False
return True
@@ -279,6 +339,9 @@ class PowerViewBLE:
.copy()
.decode("UTF-8")
)
except BleakError as ex:
LOGGER.debug("%s: querying failed: %s", self.name, ex)
raise
finally:
await self.disconnect()
LOGGER.debug("%s device data: %s", self.name, data)
@@ -290,8 +353,17 @@ class PowerViewBLE:
LOGGER.debug("Disconnected from %s", client.address)
def _notification_handler(self, _sender, data: bytearray) -> None:
LOGGER.debug("%s received BLE data: %s", self.name, data)
self._data = data
LOGGER.debug("%s received BLE data: %s", self.name, data.hex(" "))
self._data = bytes(data)
if self._cipher is not None and self._is_encrypted:
dec: AEADDecryptionContext = self._cipher.decryptor()
self._data = bytes(dec.update(bytes(data)) + dec.finalize())
LOGGER.debug(
"%s %s",
"decoded data: ".rjust(19 + len(self.name)),
self._data.hex(" "),
)
self._data_event.set()
async def _connect(self) -> None:
@@ -303,15 +375,16 @@ class PowerViewBLE:
LOGGER.debug("%s already connected", self.name)
return
start = time.time()
start: float = time.time()
self._client = await establish_connection(
BleakClient,
self._ble_device,
self.name,
disconnected_callback=self._on_disconnect,
ble_device_callback=lambda: self._ble_device,
services=[
UUID_COV_SERVICE,
# self.UUID_DEV_SERVICE,
UUID_DEV_SERVICE,
# self.UUID_BAT_SERVICE,
],
)

View File

@@ -40,7 +40,9 @@ async def async_setup_entry(
)
class PVBinarySensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySensorEntity): # type: ignore[reportIncompatibleMethodOverride]
class PVBinarySensor(
PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySensorEntity
): # type: ignore[reportIncompatibleMethodOverride]
"""The generic PV binary sensor implementation."""
def __init__(
@@ -49,7 +51,7 @@ class PVBinarySensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySen
descr: BinarySensorEntityDescription,
unique_id: str,
) -> None:
"""Intialize PV binary sensor."""
"""Initialize PV binary sensor."""
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
self._attr_device_info = coord.device_info
self._attr_has_entity_name = True

View File

@@ -0,0 +1,71 @@
"""Hunter Douglas Powerview cover."""
from typing import Final
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
from homeassistant.components.button import (
ButtonDeviceClass,
ButtonEntity,
ButtonEntityDescription,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo, format_mac
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import DOMAIN, LOGGER
from .coordinator import PVCoordinator
BUTTONS_SHADE: Final = [
ButtonEntityDescription(
key="identify",
device_class=ButtonDeviceClass.IDENTIFY,
entity_category=EntityCategory.DIAGNOSTIC,
),
]
async def async_setup_entry(
_hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the demo cover platform."""
coordinator: PVCoordinator = config_entry.runtime_data
for descr in BUTTONS_SHADE:
async_add_entities([PowerViewButton(coordinator, descr)])
class PowerViewButton(PassiveBluetoothCoordinatorEntity[PVCoordinator], ButtonEntity): # type: ignore[reportIncompatibleVariableOverride]
"""Representation of a powerview shade."""
_attr_has_entity_name = True
_attr_device_class = ButtonDeviceClass.IDENTIFY
def __init__(
self,
coordinator: PVCoordinator,
description: ButtonEntityDescription,
) -> None:
"""Initialize the shade."""
self.entity_description = description
self._coord: PVCoordinator = coordinator
self._attr_device_info = self._coord.device_info
self._attr_unique_id = (
f"{DOMAIN}_{format_mac(self._coord.address)}_{ButtonDeviceClass.IDENTIFY}"
)
super().__init__(coordinator)
@property
def device_info(self) -> DeviceInfo: # type: ignore[reportIncompatibleVariableOverride]
"""Return the device_info of the device."""
return self._coord.device_info
async def async_press(self) -> None:
"""Handle the button press."""
LOGGER.debug("identify cover")
await self._coord.api.identify()

View File

@@ -1,9 +1,15 @@
"""Config flow for BLE Battery Management System integration."""
import asyncio
import base64
import contextlib
from dataclasses import dataclass
import struct
from typing import Any
import aiohttp
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.bluetooth import (
BluetoothServiceInfoBleak,
@@ -11,19 +17,163 @@ from homeassistant.components.bluetooth import (
)
from homeassistant.config_entries import ConfigFlowResult
from homeassistant.const import CONF_ADDRESS
# from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.selector import (
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from .api import UUID_COV_SERVICE as UUID
from .const import DOMAIN, LOGGER, MFCT_ID
from .const import CONF_HOME_KEY, DOMAIN, LOGGER, MFCT_ID
def _needs_encryption(manufacturer_data_hex: str) -> bool:
"""Return True if the BLE advertisement indicates encryption (home_id != 0)."""
data = bytearray.fromhex(manufacturer_data_hex)
if len(data) < 2:
return False
home_id = int.from_bytes(data[0:2], byteorder="little")
return home_id != 0
@dataclass
class HubShadeInfo:
"""Shade metadata from the PowerView hub."""
name: str # Human-readable name (decoded from base64)
ble_name: str # BLE advertisement name, e.g. "DUE:94ED"
async def _fetch_shades_from_hub(
hass: HomeAssistant, hub_url: str
) -> list[HubShadeInfo]:
"""Fetch shade list with human-readable names from a PowerView G3 hub.
Raises aiohttp.ClientError on network errors.
Raises asyncio.TimeoutError on timeout.
"""
session = async_get_clientsession(hass)
timeout = aiohttp.ClientTimeout(total=10)
async with session.get(f"{hub_url}/home/shades", timeout=timeout) as resp:
resp.raise_for_status()
shades = await resp.json(content_type=None)
if not shades:
return []
hub_shades: list[HubShadeInfo] = []
for shade in shades:
ble_name = shade.get("bleName", "")
if not ble_name:
continue
name_b64 = shade.get("name", "")
try:
name = base64.b64decode(name_b64).decode("utf-8") if name_b64 else ble_name
except Exception: # noqa: BLE001
name = ble_name
hub_shades.append(HubShadeInfo(name=name, ble_name=ble_name))
return hub_shades
async def _fetch_key_and_shades_from_hub(
hass: HomeAssistant, hub_url: str
) -> tuple[bytes, list[HubShadeInfo]]:
"""Fetch 16-byte homekey and shade list from a PowerView G3 hub.
Returns (key, shade_list). The key is network-wide so any reachable shade
returns the same value. The shade list contains human-readable names that
can be used to label BLE-discovered devices.
Raises ValueError on protocol/key errors.
Raises aiohttp.ClientError on network errors.
Raises asyncio.TimeoutError on timeout.
"""
hub_shades = await _fetch_shades_from_hub(hass, hub_url)
if not hub_shades:
raise ValueError("No shades found on the hub")
session = async_get_clientsession(hass)
timeout = aiohttp.ClientTimeout(total=10)
# GetShadeKey BLE request: sid=251, cid=18, seqId=1, data_len=0
request_frame = struct.pack("<BBBB", 251, 18, 1, 0)
# Try each shade until one returns a valid key (some may be out of range)
last_error: Exception = ValueError("No shades responded")
for hs in hub_shades:
try:
async with session.post(
f"{hub_url}/home/shades/exec?shades={hs.ble_name}",
json={"hex": request_frame.hex()},
timeout=timeout,
) as resp:
resp.raise_for_status()
result = await resp.json(content_type=None)
except (TimeoutError, aiohttp.ClientError) as ex:
last_error = ex
continue
responses = result.get("responses", [])
if len(responses) != 1 or "hex" not in responses[0]:
continue
response_bytes = bytes.fromhex(responses[0]["hex"])
if len(response_bytes) < 5:
continue
_s, _c, _q, length = struct.unpack("<BBBB", response_bytes[0:4])
if len(response_bytes) != 4 + length:
continue
if response_bytes[4] != 0:
continue
key_data = response_bytes[5:]
if len(key_data) != 16:
continue
return key_data, hub_shades
raise ValueError(f"No reachable shade returned a valid key: {last_error}")
_HOMEKEY_SCHEMA = vol.Schema(
{
vol.Required("key_method", default="hub"): SelectSelector(
SelectSelectorConfig(
options=[
SelectOptionDict(
value="hub",
label="Fetch automatically from PowerView hub",
),
SelectOptionDict(
value="manual",
label="Enter key manually (32 hex characters)",
),
SelectOptionDict(
value="skip",
label="Skip (no key — controls disabled for encrypted shades)",
),
]
)
),
vol.Optional("hub_url", default="http://powerview-g3.local"): TextSelector(
TextSelectorConfig(type=TextSelectorType.URL)
),
vol.Optional("home_key", default=""): TextSelector(
TextSelectorConfig(type=TextSelectorType.TEXT)
),
}
)
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for BT Battery Management System."""
VERSION = 1
MINOR_VERSION = 0
MINOR_VERSION = 1
@dataclass
class DiscoveredDevice:
@@ -37,6 +187,83 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
self._discovered_device: ConfigFlow.DiscoveredDevice | None = None
self._discovered_devices: dict[str, ConfigFlow.DiscoveredDevice] = {}
self._manufacturer_data_hex: str = ""
self._device_name: str = ""
self._home_key: str = ""
self._hub_url: str = ""
self._hub_shades: list[HubShadeInfo] = []
def _create_entry(self) -> ConfigFlowResult:
"""Create the config entry with collected data."""
data: dict[str, str] = {
"manufacturer_data": self._manufacturer_data_hex,
CONF_HOME_KEY: self._home_key,
}
if self._hub_url:
data["hub_url"] = self._hub_url
return self.async_create_entry(
title=self._device_name,
data=data,
)
def _validate_manual_key(
self, user_input: dict[str, Any], errors: dict[str, str]
) -> bool:
"""Validate a manually entered hex key and store it.
Returns True on success, False on validation error.
"""
raw = user_input.get("home_key", "").strip()
if "\\x" in raw:
raw = raw.replace("\\x", "")
if len(raw) != 32:
errors["home_key"] = "invalid_key_length"
return False
try:
bytes.fromhex(raw)
except ValueError:
errors["home_key"] = "invalid_key_format"
return False
self._home_key = raw.lower()
return True
async def _validate_homekey_input(
self, user_input: dict[str, Any], errors: dict[str, str]
) -> bool:
"""Parse and validate homekey user_input, populating self state.
Returns True on success, False on validation error (errors dict is populated).
On skip, self._home_key is set to "".
"""
method = user_input.get("key_method", "skip")
if method == "skip":
self._home_key = ""
return True
if method == "manual":
return self._validate_manual_key(user_input, errors)
if method != "hub":
return False
hub_url = user_input.get("hub_url", "").rstrip("/")
_HUB_ERROR_MAP: dict[type[Exception], str] = {
aiohttp.ClientResponseError: "hub_http_error",
aiohttp.ClientConnectionError: "hub_connection_error",
TimeoutError: "hub_timeout",
ValueError: "hub_protocol_error",
}
try:
key, hub_shades = await _fetch_key_and_shades_from_hub(self.hass, hub_url)
except tuple(_HUB_ERROR_MAP) as ex:
errors["hub_url"] = _HUB_ERROR_MAP[type(ex)]
return False
self._home_key = key.hex()
self._hub_url = hub_url
self._hub_shades = hub_shades
return True
async def async_step_bluetooth(
self, discovery_info: BluetoothServiceInfoBleak
@@ -61,10 +288,17 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
LOGGER.debug("confirm step for %s", self._discovered_device.name)
if user_input is not None:
return self.async_create_entry(
title=self._discovered_device.name,
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
self._manufacturer_data_hex = (
self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()
)
self._device_name = self._discovered_device.name
# Unencrypted shades can skip the homekey step entirely
if not _needs_encryption(self._manufacturer_data_hex):
await self._resolve_friendly_name()
return self._create_entry()
return await self.async_step_homekey_bluetooth()
self._set_confirm_only()
@@ -73,25 +307,153 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
description_placeholders={"name": self._discovered_device.name},
)
async def async_step_homekey_bluetooth(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Configure homekey for a shade discovered via Bluetooth."""
# Reuse an existing key if another shade was already configured
existing = self._existing_home_key()
if existing and user_input is None:
self._home_key = existing
await self._resolve_friendly_name()
return self._create_entry()
errors: dict[str, str] = {}
if user_input is not None and await self._validate_homekey_input(
user_input, errors
):
# Use hub name for the entry title if available
friendly = self._hub_name_for(self._device_name)
if friendly:
self._device_name = friendly
return self._create_entry()
return self.async_show_form(
step_id="homekey_bluetooth",
data_schema=_HOMEKEY_SCHEMA,
errors=errors,
description_placeholders={
"name": self._device_name,
"hub_url_example": "http://powerview-g3.local",
},
)
def _existing_entry_value(self, key: str) -> str:
"""Return the first non-empty value for *key* across configured entries."""
for entry in self._async_current_entries():
if value := entry.data.get(key, ""):
return value
return ""
def _existing_home_key(self) -> str:
"""Return the home_key from any already-configured entry, or ''."""
return self._existing_entry_value(CONF_HOME_KEY)
async def _resolve_friendly_name(self) -> None:
"""Try to resolve BLE device name to hub friendly name."""
hub_url = self._hub_url or self._existing_entry_value("hub_url")
if not hub_url:
return
try:
shades = await _fetch_shades_from_hub(self.hass, hub_url)
for hs in shades:
if hs.ble_name == self._device_name:
self._device_name = hs.name
break
if not self._hub_url:
self._hub_url = hub_url
except (TimeoutError, aiohttp.ClientError, ValueError):
pass
def _hub_name_for(self, ble_name: str) -> str | None:
"""Return the human-readable hub name for a BLE name, or None."""
for hs in self._hub_shades:
if hs.ble_name == ble_name:
return hs.name
return None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the user step to pick discovered device."""
"""Handle the user step — reuse existing key or offer a menu."""
LOGGER.debug("user step")
existing = self._existing_home_key()
if existing:
self._home_key = existing
self._hub_url = self._hub_url or self._existing_entry_value("hub_url")
if self._hub_url and not self._hub_shades:
with contextlib.suppress(
TimeoutError, aiohttp.ClientError, ValueError
):
self._hub_shades = await _fetch_shades_from_hub(
self.hass, self._hub_url
)
return self.async_show_menu(
step_id="user",
menu_options=["select_device", "manual"],
)
return await self.async_step_homekey()
def _build_selected_entries(
self, user_input: dict[str, Any]
) -> list[dict[str, Any]]:
"""Build config entry data for each selected shade address."""
addresses: list[str] = user_input[CONF_ADDRESS]
if isinstance(addresses, str):
addresses = [addresses]
entries: list[dict[str, Any]] = []
for address in addresses:
device = self._discovered_devices[address]
ble_name = device.name
name = self._hub_name_for(ble_name) or ble_name
mfct_hex = device.discovery_info.manufacturer_data[MFCT_ID].hex()
entry_data: dict[str, str] = {
"manufacturer_data": mfct_hex,
CONF_HOME_KEY: self._home_key,
}
if self._hub_url:
entry_data["hub_url"] = self._hub_url
entries.append(
{
"address": address,
"name": name,
"data": entry_data,
}
)
return entries
async def async_step_select_device(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Select one or more BLE-discovered shades, or fall through to manual."""
LOGGER.debug("select_device step")
if user_input is not None:
address = user_input[CONF_ADDRESS]
await self.async_set_unique_id(address, raise_on_progress=False)
self._abort_if_unique_id_configured()
self._discovered_device = self._discovered_devices[address]
entries = self._build_selected_entries(user_input)
self.context["title_placeholders"] = {"name": self._discovered_device.name}
return self.async_create_entry(
title=self._discovered_device.name,
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
# Kick off auto-add flows for all but the last shade
await asyncio.gather(
*(
self.hass.config_entries.flow.async_init(
DOMAIN,
context={"source": "auto_add"},
data=info,
)
for info in entries[:-1]
)
)
# Create the final entry normally (ends this flow)
last = entries[-1]
await self.async_set_unique_id(last["address"], raise_on_progress=False)
self._abort_if_unique_id_configured()
self._device_name = last["name"]
self._manufacturer_data_hex = last["data"]["manufacturer_data"]
self.context["title_placeholders"] = {"name": self._device_name}
return self._create_entry()
current_addresses = self._async_current_ids()
for discovery_info in async_discovered_service_info(self.hass, False):
address = discovery_info.address
@@ -109,19 +471,96 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
)
if not self._discovered_devices:
return self.async_abort(reason="no_devices_found")
return await self.async_step_manual()
titles = []
titles: list[SelectOptionDict] = []
for address, discovery in self._discovered_devices.items():
titles.append({"value": address, "label": discovery.name})
hub_name = self._hub_name_for(discovery.name)
label = f"{hub_name} ({discovery.name})" if hub_name else discovery.name
titles.append({"value": address, "label": label})
return self.async_show_form(
step_id="user",
step_id="select_device",
data_schema=vol.Schema(
{
vol.Required(CONF_ADDRESS): SelectSelector(
SelectSelectorConfig(options=titles)
SelectSelectorConfig(options=titles, multiple=True)
)
}
),
)
async def async_step_auto_add(
self, discovery_info: dict[str, Any]
) -> ConfigFlowResult:
"""Handle a shade queued from multi-select for individual setup."""
await self.async_set_unique_id(discovery_info["address"])
self._abort_if_unique_id_configured()
self._device_name = discovery_info["name"]
self._manufacturer_data_hex = discovery_info["data"]["manufacturer_data"]
self._home_key = discovery_info["data"].get(CONF_HOME_KEY, "")
self._hub_url = discovery_info["data"].get("hub_url", "")
self.context["title_placeholders"] = {"name": self._device_name}
return await self.async_step_auto_add_confirm()
async def async_step_auto_add_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm adding a shade discovered via multi-select."""
if user_input is not None:
return self._create_entry()
self._set_confirm_only()
return self.async_show_form(
step_id="auto_add_confirm",
description_placeholders={"name": self._device_name},
)
async def async_step_manual(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle manual entry of a BLE device address and name."""
if user_input is not None:
address = user_input[CONF_ADDRESS].upper().strip()
self._device_name = user_input["ble_name"].strip()
await self.async_set_unique_id(address, raise_on_progress=False)
self._abort_if_unique_id_configured()
self.context["title_placeholders"] = {"name": self._device_name}
self._manufacturer_data_hex = ""
return self._create_entry()
return self.async_show_form(
step_id="manual",
data_schema=vol.Schema(
{
vol.Required(CONF_ADDRESS): TextSelector(
TextSelectorConfig(type=TextSelectorType.TEXT)
),
vol.Required("ble_name"): TextSelector(
TextSelectorConfig(type=TextSelectorType.TEXT)
),
}
),
)
async def async_step_homekey(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Configure homekey — collected before device selection."""
errors: dict[str, str] = {}
if user_input is not None and await self._validate_homekey_input(
user_input, errors
):
return await self.async_step_select_device()
return self.async_show_form(
step_id="homekey",
data_schema=_HOMEKEY_SCHEMA,
errors=errors,
description_placeholders={
"hub_url_example": "http://powerview-g3.local",
},
)

View File

@@ -3,25 +3,12 @@
import logging
from typing import Final
# from bleak.uuids import normalize_uuid_str
# from homeassistant.const import ( # noqa: F401
# ATTR_BATTERY_CHARGING,
# ATTR_BATTERY_LEVEL,
# ATTR_TEMPERATURE,
# ATTR_VOLTAGE,
# )
DOMAIN: Final[str] = "hunterdouglas_powerview_ble"
LOGGER: Final = logging.getLogger(__package__)
MFCT_ID: Final[int] = 2073
TIMEOUT: Final[int] = 5
# put the key here, needs to be 16 bytes long, e.g.
# HOME_KEY: Final[bytes] = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
HOME_KEY: Final[bytes] = b""
CONF_HOME_KEY: Final[str] = "home_key"
# attributes (do not change)
ATTR_RSSI: Final[str] = "rssi"

View File

@@ -3,6 +3,7 @@
from typing import Any
from bleak.backends.device import BLEDevice
from homeassistant.components import bluetooth
from homeassistant.components.bluetooth.const import DOMAIN as BLUETOOTH_DOMAIN
from homeassistant.components.bluetooth.passive_update_coordinator import (
@@ -11,27 +12,37 @@ from homeassistant.components.bluetooth.passive_update_coordinator import (
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import CONNECTION_BLUETOOTH, DeviceInfo
from .api import SHADE_TYPE, PowerViewBLE
from .const import ATTR_RSSI, DOMAIN, HOME_KEY, LOGGER
from .api import SHADE_TYPE, PowerViewBLE, ShadeCapability, get_shade_capabilities
from .const import ATTR_RSSI, CONF_HOME_KEY, DOMAIN, LOGGER
class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
"""Update coordinator for a battery management system."""
def __init__(
self, hass: HomeAssistant, ble_device: BLEDevice, data: dict[str, Any]
self,
hass: HomeAssistant,
ble_device: BLEDevice,
data: dict[str, Any],
friendly_name: str | None = None,
) -> None:
"""Initialize BMS data coordinator."""
assert ble_device.name is not None
self._mac = ble_device.address
self.api = PowerViewBLE(ble_device, HOME_KEY)
self._friendly_name = friendly_name or ble_device.name
home_key_hex: str = data.get(CONF_HOME_KEY, "")
home_key: bytes = (
bytes.fromhex(home_key_hex) if len(home_key_hex) == 32 else b""
)
self.api = PowerViewBLE(ble_device, home_key)
self.data: dict[str, int | float | bool] = {}
self._manuf_dat = data.get("manufacturer_data")
self.dev_details: dict[str, str] = {}
self.velocity: int = 0
LOGGER.debug(
"Initializing coordinator for %s (%s)",
ble_device.name,
self._friendly_name,
ble_device.address,
)
super().__init__(
@@ -41,6 +52,18 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
bluetooth.BluetoothScanningMode.ACTIVE,
)
@property
def type_id(self) -> int | None:
"""Return the shade type ID from manufacturer data."""
if self._manuf_dat:
return int(bytes.fromhex(self._manuf_dat)[2])
return None
@property
def shade_capabilities(self) -> ShadeCapability:
"""Return the shade capabilities based on type ID."""
return get_shade_capabilities(self.type_id)
async def query_dev_info(self) -> None:
"""Receive detailed information from device."""
LOGGER.debug("%s: querying device info", self.name)
@@ -49,24 +72,23 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
@property
def device_info(self) -> DeviceInfo:
"""Return detailed device information for GUI."""
LOGGER.debug("%s: device_info, %s", self.name, self.dev_details)
LOGGER.debug("%s: device_info, %s", self._friendly_name, self.dev_details)
return DeviceInfo(
identifiers={
(DOMAIN, self.name),
(DOMAIN, self.address),
(BLUETOOTH_DOMAIN, self.address),
},
connections={(CONNECTION_BLUETOOTH, self.address)},
name=self.name,
name=self._friendly_name,
configuration_url=None,
# properties used in GUI:
manufacturer="Hunter Douglas",
model=(
str(SHADE_TYPE.get(int(bytes.fromhex(self._manuf_dat)[2]), "unknown"))
if self._manuf_dat
str(SHADE_TYPE.get(self.type_id, "unknown"))
if self.type_id is not None
else None
),
model_id=(
str(bytes.fromhex(self._manuf_dat)[2]) if self._manuf_dat else None
str(self.type_id) if self.type_id is not None else None
),
serial_number=self.dev_details.get("serial_nr"),
sw_version=self.dev_details.get("sw_rev"),
@@ -80,7 +102,7 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
def _async_stop(self) -> None:
"""Shutdown coordinator and any connection."""
LOGGER.debug("%s: shuting down BMS device", self.name)
LOGGER.debug("%s: shutting down BMS device", self.name)
self.hass.async_create_task(self.api.disconnect())
super()._async_stop()
@@ -92,10 +114,8 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
) -> None:
"""Handle a Bluetooth event."""
# if not self.dev_details:
# self.hass.async_create_task(self._get_device_info())
LOGGER.debug("BLE event %s: %s", change, service_info.manufacturer_data)
self.api.set_ble_device(service_info.device)
self.data = {ATTR_RSSI: service_info.rssi}
if change == bluetooth.BluetoothChange.ADVERTISEMENT:
self.data.update(

View File

@@ -3,23 +3,26 @@
from typing import Any, Final
from bleak.exc import BleakError
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
ATTR_CURRENT_TILT_POSITION,
ATTR_POSITION,
ATTR_TILT_POSITION,
CoverDeviceClass,
CoverEntity,
CoverEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo, format_mac
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .api import CLOSED_POSITION, OPEN_POSITION
from .const import DOMAIN, HOME_KEY, LOGGER
from .const import DOMAIN, LOGGER
from .coordinator import PVCoordinator
@@ -28,14 +31,23 @@ async def async_setup_entry(
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the demo cover platform."""
"""Set up the cover platform."""
coordinator: PVCoordinator = config_entry.runtime_data
async_add_entities([PowerViewCover(coordinator)])
caps = coordinator.shade_capabilities
if caps.tilt_only:
entities: list[PowerViewCover] = [PowerViewCoverTiltOnly(coordinator)]
elif caps.has_tilt:
entities = [PowerViewCoverTilt(coordinator)]
else:
entities = [PowerViewCover(coordinator)]
async_add_entities(entities)
class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEntity): # type: ignore[reportIncompatibleVariableOverride]
"""Representation of a powerview shade."""
"""Representation of a PowerView shade with Up/Down functionality only."""
_attr_has_entity_name = True
_attr_device_class = CoverDeviceClass.SHADE
@@ -51,8 +63,9 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
coordinator: PVCoordinator,
) -> None:
"""Initialize the shade."""
self._attr_name = CoverDeviceClass.SHADE
self._coord = coordinator
LOGGER.debug("%s: init() PowerViewCover", coordinator.name)
self._attr_name = None
self._coord: PVCoordinator = coordinator
self._attr_device_info = self._coord.device_info
self._target_position: int | None = round(
self._coord.data.get(ATTR_CURRENT_POSITION, OPEN_POSITION)
@@ -62,11 +75,6 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
)
super().__init__(coordinator)
@property
def device_info(self) -> DeviceInfo: # type: ignore[reportIncompatibleVariableOverride]
"""Return the device_info of the device."""
return self._coord.device_info
@property
def is_opening(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is opening or not."""
@@ -96,7 +104,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
def supported_features(self) -> CoverEntityFeature: # type: ignore[reportIncompatibleVariableOverride]
"""Flag supported features, disable control if encryption is needed."""
if (
self._coord.data.get("home_id") and len(HOME_KEY) != 16
self._coord.data.get("home_id") and not self._coord.api.has_key
) or self._coord.data.get("battery_charging"):
return CoverEntityFeature(0)
@@ -122,7 +130,10 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
return
self._target_position = round(target_position)
try:
await self._coord.api.set_position(round(target_position))
await self._coord.api.set_position(
round(target_position),
velocity=self._coord.velocity,
)
self.async_write_ha_state()
except BleakError as err:
LOGGER.error(
@@ -142,7 +153,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
return
try:
self._target_position = OPEN_POSITION
await self._coord.api.open()
await self._coord.api.open(velocity=self._coord.velocity)
self.async_write_ha_state()
except BleakError as err:
LOGGER.error("Failed to open cover '%s': %s", self.name, err)
@@ -155,7 +166,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
return
try:
self._target_position = CLOSED_POSITION
await self._coord.api.close()
await self._coord.api.close(velocity=self._coord.velocity)
self.async_write_ha_state()
except BleakError as err:
LOGGER.error("Failed to close cover '%s': %s", self.name, err)
@@ -170,3 +181,119 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
self.async_write_ha_state()
except BleakError as err:
LOGGER.error("Failed to stop cover '%s': %s", self.name, err)
class PowerViewCoverTilt(PowerViewCover):
"""Representation of a PowerView shade with additional tilt functionality."""
_attr_supported_features = (
CoverEntityFeature.OPEN
| CoverEntityFeature.CLOSE
| CoverEntityFeature.STOP
| CoverEntityFeature.SET_POSITION
| CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
| CoverEntityFeature.STOP_TILT
| CoverEntityFeature.SET_TILT_POSITION
)
def __init__(
self,
coordinator: PVCoordinator,
) -> None:
"""Initialize the shade with tilt."""
LOGGER.debug("%s: init() PowerViewCoverTilt", coordinator.name)
super().__init__(coordinator)
@property
def current_cover_tilt_position(self) -> int | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return current tilt of cover.
None is unknown
"""
pos: Final = self._coord.data.get(ATTR_CURRENT_TILT_POSITION)
return round(pos) if pos is not None else None
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the tilt to a specific position."""
if isinstance(target_position := kwargs.get(ATTR_TILT_POSITION), int):
LOGGER.debug("set cover tilt to position %i", target_position)
if (
self.current_cover_tilt_position == round(target_position)
or self.current_cover_position is None
):
return
try:
await self._coord.api.set_position(
self.current_cover_position,
tilt=target_position,
velocity=self._coord.velocity,
)
self.async_write_ha_state()
except BleakError as err:
LOGGER.error(
"Failed to tilt cover '%s' to %f%%: %s",
self.name,
target_position,
err,
)
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
"""Stop the cover."""
await self.async_stop_cover(kwargs=kwargs)
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
LOGGER.debug("open cover tilt")
_kwargs = {**kwargs, ATTR_TILT_POSITION: OPEN_POSITION}
await self.async_set_cover_tilt_position(**_kwargs)
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
"""Close the cover tilt."""
LOGGER.debug("close cover tilt")
_kwargs = {**kwargs, ATTR_TILT_POSITION: CLOSED_POSITION}
await self.async_set_cover_tilt_position(**_kwargs)
class PowerViewCoverTiltOnly(PowerViewCoverTilt):
"""Representation of a PowerView shade with additional tilt functionality."""
OPENCLOSED_THRESHOLD = 5
_attr_device_class = CoverDeviceClass.BLIND
_attr_supported_features = (
CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
| CoverEntityFeature.STOP_TILT
| CoverEntityFeature.SET_TILT_POSITION
)
def __init__(
self,
coordinator: PVCoordinator,
) -> None:
"""Initialize the shade with tilt only."""
LOGGER.debug("%s: init() PowerViewCoverTiltOnly", coordinator.name)
super().__init__(coordinator)
@property
def is_opening(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is opening or not."""
return False
@property
def is_closing(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is closing or not."""
return False
@property
def is_closed(self) -> bool: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is closed."""
return isinstance(self.current_cover_tilt_position, int) and (
self.current_cover_tilt_position
>= OPEN_POSITION - PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
or self.current_cover_tilt_position
<= CLOSED_POSITION + PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
)

View File

@@ -4,8 +4,7 @@
"bluetooth": [
{
"service_uuid": "0000fdc1-0000-1000-8000-00805f9b34fb",
"manufacturer_id": 2073,
"manufacturer_data_start": [0,0]
"manufacturer_id": 2073
}
],
"codeowners": ["@patman15"],
@@ -17,5 +16,5 @@
"issue_tracker": "https://github.com/patman15/hdpv_ble/issues",
"loggers": ["hunterdouglas_powerview_ble"],
"requirements": ["cryptography>=43.0.0"],
"version": "0.22"
"version": "0.24"
}

View File

@@ -0,0 +1,69 @@
"""Hunter Douglas PowerView velocity control."""
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
from homeassistant.components.number import NumberMode, RestoreNumber
from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import ConfigEntryType
from .const import DOMAIN, LOGGER
from .coordinator import PVCoordinator
async def async_setup_entry(
_hass: HomeAssistant,
config_entry: ConfigEntryType,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the velocity number entity."""
coordinator: PVCoordinator = config_entry.runtime_data
async_add_entities([PowerViewVelocity(coordinator)])
class PowerViewVelocity(
PassiveBluetoothCoordinatorEntity[PVCoordinator], RestoreNumber
): # type: ignore[reportIncompatibleVariableOverride]
"""Number entity to control shade movement velocity."""
_attr_has_entity_name = True
_attr_name = "Velocity"
_attr_icon = "mdi:speedometer"
_attr_mode = NumberMode.SLIDER
_attr_native_min_value = 0
_attr_native_max_value = 100
_attr_native_step = 1
_attr_entity_category = EntityCategory.CONFIG
def __init__(self, coordinator: PVCoordinator) -> None:
"""Initialize the velocity entity."""
self._coord = coordinator
self._attr_device_info = self._coord.device_info
self._attr_unique_id = (
f"{DOMAIN}_{format_mac(self._coord.address)}_velocity"
)
super().__init__(coordinator)
@property
def native_value(self) -> int:
"""Return the current velocity value."""
return self._coord.velocity
async def async_added_to_hass(self) -> None:
"""Restore last known velocity on startup."""
await super().async_added_to_hass()
last_data = await self.async_get_last_number_data()
if last_data and last_data.native_value is not None:
self._coord.velocity = int(last_data.native_value)
LOGGER.debug(
"%s: restored velocity to %s", self._coord.name, self._coord.velocity
)
async def async_set_native_value(self, value: float) -> None:
"""Set the velocity value."""
self._coord.velocity = int(value)
self.async_write_ha_state()

View File

@@ -3,14 +3,8 @@
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
from homeassistant.components.sensor import (
SensorEntity,
SensorEntityDescription,
)
from homeassistant.components.sensor.const import (
SensorDeviceClass,
SensorStateClass,
)
from homeassistant.components.sensor import SensorEntity, SensorEntityDescription
from homeassistant.components.sensor.const import SensorDeviceClass, SensorStateClass
from homeassistant.const import (
ATTR_BATTERY_LEVEL,
PERCENTAGE,
@@ -67,7 +61,7 @@ class PVSensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], SensorEntity):
def __init__(
self, pv_dev: PVCoordinator, descr: SensorEntityDescription, unique_id: str
) -> None:
"""Intitialize the BMS sensor."""
"""Initialize the BMS sensor."""
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
self._attr_device_info = pv_dev.device_info
self.entity_description = descr

View File

@@ -2,10 +2,73 @@
"config": {
"flow_title": "Setup {name}",
"step": {
"user": {
"title": "Add PowerView Shade",
"menu_options": {
"select_device": "Select from discovered shades",
"manual": "Enter device details manually"
},
"menu_option_descriptions": {
"select_device": "Choose from shades detected via Bluetooth nearby.",
"manual": "Enter a Bluetooth MAC address and device name directly, for example if a shade is out of range of discovery."
}
},
"bluetooth_confirm": {
"description": "[%key:component::bluetooth::config::step::bluetooth_confirm::description%]"
},
"homekey": {
"title": "Configure HomeKey",
"description": "All shades on a PowerView network share the same encryption key. The recommended method is to fetch it from your G3 hub. You can also enter the key manually, or skip if your shades are unencrypted.",
"data": {
"key_method": "Key source",
"hub_url": "PowerView hub URL",
"home_key": "HomeKey (32 hex characters or \\xNN format)"
},
"data_description": {
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
}
},
"homekey_bluetooth": {
"title": "Configure HomeKey for {name}",
"description": "This shade uses encryption. The recommended method is to fetch the key from your G3 hub. You can also enter it manually, or skip (controls will be disabled until a key is configured).",
"data": {
"key_method": "Key source",
"hub_url": "PowerView hub URL",
"home_key": "HomeKey (32 hex characters or \\xNN format)"
},
"data_description": {
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
}
},
"auto_add_confirm": {
"description": "Do you want to set up {name}?"
},
"select_device": {
"title": "Select Shades",
"description": "Select the PowerView shades to add via Bluetooth.",
"data": {
"address": "Shades"
}
},
"manual": {
"title": "Enter Device Details",
"description": "Enter the device details manually.",
"data": {
"address": "Bluetooth MAC address (e.g. AA:BB:CC:DD:EE:FF)",
"ble_name": "BLE device name (e.g. DUE:94ED)"
}
}
},
"error": {
"invalid_key_format": "HomeKey must be a valid hexadecimal string",
"invalid_key_length": "HomeKey must be exactly 32 hex characters (16 bytes)",
"hub_connection_error": "Cannot connect to the PowerView hub",
"hub_http_error": "Hub returned an HTTP error",
"hub_timeout": "Connection to hub timed out",
"hub_protocol_error": "Hub returned an unexpected response"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]",

View File

@@ -1,15 +1,75 @@
{
"config": {
"flow_title": "Setup {name}",
"step": {
"user": {
"title": "Add PowerView Shade",
"menu_options": {
"select_device": "Select from discovered shades",
"manual": "Enter device details manually"
},
"menu_option_descriptions": {
"select_device": "Choose from shades detected via Bluetooth nearby.",
"manual": "Enter a Bluetooth MAC address and device name directly, for example if a shade is out of range of discovery."
}
},
"bluetooth_confirm": {
"description": "Do you want to set up {name}?"
},
"homekey": {
"title": "Configure HomeKey",
"description": "All shades on a PowerView network share the same encryption key. The recommended method is to fetch it from your G3 hub. You can also enter the key manually, or skip if your shades are unencrypted.",
"data": {
"key_method": "Key source",
"hub_url": "PowerView hub URL",
"home_key": "HomeKey (32 hex characters or \\xNN format)"
},
"data_description": {
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
}
},
"homekey_bluetooth": {
"title": "Configure HomeKey for {name}",
"description": "This shade uses encryption. The recommended method is to fetch the key from your G3 hub. You can also enter it manually, or skip (controls will be disabled until a key is configured).",
"data": {
"key_method": "Key source",
"hub_url": "PowerView hub URL",
"home_key": "HomeKey (32 hex characters or \\xNN format)"
},
"data_description": {
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
}
},
"select_device": {
"title": "Select Shades",
"description": "Select the PowerView shades to add via Bluetooth.",
"data": {
"address": "Shades"
}
},
"manual": {
"title": "Enter Device Details",
"description": "Enter the device details manually.",
"data": {
"address": "Bluetooth MAC address (e.g. AA:BB:CC:DD:EE:FF)",
"ble_name": "BLE device name (e.g. DUE:94ED)"
}
}
},
"error": {
"invalid_key_format": "HomeKey must be a valid hexadecimal string",
"invalid_key_length": "HomeKey must be exactly 32 hex characters (16 bytes)",
"hub_connection_error": "Cannot connect to the PowerView hub",
"hub_http_error": "Hub returned an HTTP error",
"hub_timeout": "Connection to hub timed out",
"hub_protocol_error": "Hub returned an unexpected response"
},
"abort": {
"already_configured": "Device is already configured",
"no_devices_found": "No devices found on the network",
"not_supported": "Device not supported"
},
"flow_title": "Setup {name}",
"step": {
"bluetooth_confirm": {
"description": "Do you want to set up {name}?"
}
}
}
}

View File

@@ -2,6 +2,12 @@
* Emulate a Hunter Douglas PowerView cover device using ESP32
* used e.g. to gain the home_key from an existing installation via BLE
*
* REQUIRES:
* - ESP32 Board Definitions 3.0.x (tested on 3.0.7)
* - WolfSSL 5.7.x (tested on 5.7.6)
* - Phone Region: Potentially an alternative region depending on the app response
* - e.g. To add Parkland shades in the US, phone region set to the UK temporarily
*
* TODO:
* - cleanup code
* - think about emulating a remote
@@ -11,9 +17,13 @@
*/
#define NAME "myPVcover"
#define FW_VERSION "391"
#define SERIAL_NR "01234567890ABCDEF"
const uint16_t SW_VERSION = 391;
const char *SERIAL_NR = "01234567890ABCDEF";
const uint16_t TYP_ID = 42; // 62
const uint16_t MODEL_ID = 224;
const uint16_t FW_REVISION = 27;
const uint32_t HW_REVISION = 171103;
const uint8_t BATTERY_LEVEL = 42;
#include <BLEDevice.h>
#include <BLEServer.h>
@@ -40,7 +50,11 @@ int devId = INVALID_DEVID; //if not using async INVALID_DEVID is default
#define DEV_SERVICE_UUID BLEUUID("180A")
#define SER_CHAR_UUID BLEUUID("2A25")
#define SWC_CHAR_UUID BLEUUID("2A28")
#define MAN_CHAR_UUID BLEUUID("2A29")
#define MOD_CHAR_UUID BLEUUID("2A24")
#define FWR_CHAR_UUID BLEUUID("2A26")
#define HWR_CHAR_UUID BLEUUID("2A27")
#define SWR_CHAR_UUID BLEUUID("2A28")
#define BAT_SERVICE_UUID BLEUUID("180F")
#define BAT_CHAR_UUID BLEUUID("2A19")
@@ -69,7 +83,7 @@ struct notification {
};
BLECharacteristic *pCharacteristic_cover, *pCharacteristic_fw, *pCharacteristic_unknown, *pCharacteristic_bat;
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser;
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser, *pCharacteristic_man, *pCharacteristic_mod, *pCharacteristic_fwr, *pCharacteristic_hwr;
BLEServer *pServer = NULL;
bool deviceConnected = false;
bool oldDeviceConnected = false;
@@ -158,12 +172,12 @@ void decode(BLECharacteristic *pChar) {
memcpy((void *)&msg, data_dec, 4);
Serial.printf("\t message: SRV: %02x, CMD %02x, SEQ %i, LEN %i\n", msg.serviceID, msg.cmdID, msg.sequence, msg.data_len);
// sepecial responses (static data!)
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x2a, 0xe0, 0x08 }; // HW diagnostics
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
// special responses (static data!)
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, (uint8_t)(SW_VERSION & 0xFF), (uint8_t)(SW_VERSION >> 8), 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, TYP_ID, MODEL_ID, 0x08 }; // HW diagnostics
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
Serial.print("\t\t");
switch ((msg.serviceID << 8) | msg.cmdID) {
@@ -179,7 +193,7 @@ void decode(BLECharacteristic *pChar) {
break;
case 0xF711:
// identify
Serial.printf("identify: %i\n", data_dec[4]);
Serial.printf("identify: %i times\n", data_dec[4]);
resp_size = set_response(&response, (const message *)data_dec);
break;
case 0xF7B8:
@@ -208,8 +222,8 @@ void decode(BLECharacteristic *pChar) {
case 0xFB02:
// set shade key
Serial.print("set shade key: ");
print_hex(&data_raw[4], data_len - 4, "\\x");
// set resonse before key, to acknowledge unencrypted
print_hex(&data_raw[4], data_len - 4, "\\x", "");
// set response before key, to acknowledge unencrypted
resp_size = set_response(&response, (const message *)data_dec);
if (msg.data_len == 16) {
memcpy(home_key, &data_raw[4], 16);
@@ -251,7 +265,9 @@ void decode(BLECharacteristic *pChar) {
resp_size = set_response(&response, (const message *)data_dec);
break;
default:
Serial.println(F("*********************************** unknown message"));
Serial.println(F("*********************************** unknown message (try ACK)"));
resp_size = set_response(&response, (const message *)data_dec);
break;
}
if (resp_size) {
pChar->setValue((uint8_t *)&response, resp_size);
@@ -291,7 +307,7 @@ class coverCallbacks : public BLECharacteristicCallbacks {
void onNotify(BLECharacteristic *pCharacteristic) {
Serial.printf("Cover onNotify() %s\n", pCharacteristic->toString().c_str());
}
void onStatus(BLECharacteristic *pCharacteristic, Status s, uint32_t code) {
Serial.printf("Cover onStatus() %s: %s\n", BLEstate[s], pCharacteristic->toString().c_str());
}
@@ -341,6 +357,7 @@ class genericCallbacks : public BLECharacteristicCallbacks {
void setup() {
Serial.begin(115200);
delay(1000); // wait for terminal to be ready
Serial.println(NAME " initializing ...");
BLEDevice::init(NAME);
@@ -373,8 +390,7 @@ pDesc1->setValue("cover");*/
BAT_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_bat->setCallbacks(new batteryCallbacks());
uint8_t battery_level = 42;
pCharacteristic_bat->setValue(&battery_level, 1);
pCharacteristic_bat->setValue((uint8_t *)&BATTERY_LEVEL, 1);
pCharacteristic_bat->addDescriptor(new BLE2902());
BLEService *pFWService = pServer->createService(FW_SERVICE_UUID);
@@ -390,9 +406,9 @@ pDesc1->setValue("cover");*/
BLEService *pDEVService = pServer->createService(DEV_SERVICE_UUID);
pCharacteristic_dev = pDEVService->createCharacteristic(
SWC_CHAR_UUID,
SWR_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_dev->setValue(FW_VERSION);
pCharacteristic_dev->setValue(String(SW_VERSION));
pCharacteristic_dev->setCallbacks(new genericCallbacks());
pCharacteristic_ser = pDEVService->createCharacteristic(
SER_CHAR_UUID,
@@ -400,6 +416,28 @@ pDesc1->setValue("cover");*/
pCharacteristic_ser->setValue(SERIAL_NR);
pCharacteristic_ser->setCallbacks(new genericCallbacks());
pCharacteristic_man = pDEVService->createCharacteristic(
MAN_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_man->setValue("Hunter Douglas");
pCharacteristic_man->setCallbacks(new genericCallbacks());
pCharacteristic_mod = pDEVService->createCharacteristic(
MOD_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_mod->setValue(String(TYP_ID));
pCharacteristic_mod->setCallbacks(new genericCallbacks());
pCharacteristic_fwr = pDEVService->createCharacteristic(
FWR_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_fwr->setValue(String(FW_REVISION));
pCharacteristic_fwr->setCallbacks(new genericCallbacks());
pCharacteristic_hwr = pDEVService->createCharacteristic(
HWR_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_hwr->setValue(String(HW_REVISION));
pCharacteristic_hwr->setCallbacks(new genericCallbacks());
// Start the services
pCovService->start();
pFWService->start();
@@ -408,9 +446,10 @@ pDesc1->setValue("cover");*/
// Start advertising
BLEAdvertisementData AdvertisementData;
const String manufacturerData = String("\x19\x08\x00\x00\x2A\x00\x00\x00\x00\x00\xA2", 11);
// Hunter Douglas ^^--^^ ^^key^^ ^^ ID-Type
AdvertisementData.setManufacturerData(manufacturerData);
const char adv[] = { 0x19, 0x08, 0x00, 0x00, TYP_ID, 0x00, 0x00, 0x00, 0x00, 0x00, 0xA2 };
// Hunter Douglas ^^ -- ^^ ^^key ^^ ^--pos1--^
AdvertisementData.setManufacturerData(String(adv, 11));
AdvertisementData.setPartialServices(BLEUUID(COVER_SERVICE_UUID));
AdvertisementData.setFlags((1 << 2) | (1 << 1)); // [BR/EDR Not Supported] | [LE General Discoverable Mode]

BIN
img/1200x630wa.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

BIN
img/icon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

BIN
img/icon@2x.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

BIN
img/icon_full.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

BIN
img/logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

BIN
img/logo@2x.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

View File

@@ -1,7 +1,17 @@
# pyproject.toml
[project]
requires-python = ">=3.12.0"
name = "hunterdouglas_powerview_ble"
classifiers = [
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.13"
]
readme = "README.md"
requires-python = ">=3.13.2"
[project.urls]
"Source Code" = "https://github.com/patman15/hdpv_ble/"
"Bug Reports" = "https://github.com/patman15/hdpv_ble/issues"
[tool.pytest.ini_options]
minversion = "8.0"
@@ -14,9 +24,9 @@ testpaths = [
]
asyncio_mode = "auto"
# ruff configuration taken from HA 2024.11.2 (less ignores)
# ruff settings from HA 2025.2.2
[tool.ruff]
required-version = ">=0.6.8"
required-version = ">=0.9.1"
[tool.ruff.lint]
select = [
@@ -35,8 +45,10 @@ select = [
"B017", # pytest.raises(BaseException) should be considered evil
"B018", # Found useless attribute access. Either assign it to a variable or remove it.
"B023", # Function definition does not bind loop variable {name}
"B024", # `{name}` is an abstract base class, but it has no abstract methods or properties
"B026", # Star-arg unpacking after a keyword argument is strongly discouraged
"B032", # Possible unintentional type annotation (using :). Did you mean to assign (using =)?
"B035", # Dictionary comprehension uses static key
"B904", # Use raise from to specify exception cause
"B905", # zip() without an explicit strict= parameter
"BLE",
@@ -70,12 +82,27 @@ select = [
"RSE", # flake8-raise
"RUF005", # Consider iterable unpacking instead of concatenation
"RUF006", # Store a reference to the return value of asyncio.create_task
"RUF007", # Prefer itertools.pairwise() over zip() when iterating over successive pairs
"RUF008", # Do not use mutable default values for dataclass attributes
"RUF010", # Use explicit conversion flag
"RUF013", # PEP 484 prohibits implicit Optional
"RUF016", # Slice in indexed access to type {value_type} uses type {index_type} instead of an integer
"RUF017", # Avoid quadratic list summation
"RUF018", # Avoid assignment expressions in assert statements
"RUF019", # Unnecessary key check before dictionary access
# "RUF100", # Unused `noqa` directive; temporarily every now and then to clean them up
"RUF020", # {never_like} | T is equivalent to T
"RUF021", # Parenthesize a and b expressions when chaining and and or together, to make the precedence clear
"RUF022", # Sort __all__
"RUF023", # Sort __slots__
"RUF024", # Do not pass mutable objects as values to dict.fromkeys
"RUF026", # default_factory is a positional-only argument to defaultdict
"RUF030", # print() call in assert statement is likely unintentional
"RUF032", # Decimal() called with float literal argument
"RUF033", # __post_init__ method with argument defaults
"RUF034", # Useless if-else condition
"RUF100", # Unused `noqa` directive
"RUF101", # noqa directives that use redirected rule codes
"RUF200", # Failed to parse pyproject.toml: {message}
"S102", # Use of exec detected
"S103", # bad-file-permissions
"S108", # hardcoded-temp-file
@@ -88,7 +115,7 @@ select = [
"S317", # suspicious-xml-sax-usage
"S318", # suspicious-xml-mini-dom-usage
"S319", # suspicious-xml-pull-dom-usage
"S320", # suspicious-xmle-tree-usage
# "S320", # suspicious-xmle-tree-usage
"S601", # paramiko-call
"S602", # subprocess-popen-with-shell-equals-true
"S604", # call-with-shell-equals-true
@@ -99,7 +126,7 @@ select = [
"SLOT", # flake8-slots
"T100", # Trace found: {name} used
"T20", # flake8-print
"TCH", # flake8-type-checking
"TC", # flake8-type-checking
"TID", # Tidy imports
"TRY", # tryceratops
"UP", # pyupgrade
@@ -119,13 +146,12 @@ ignore = [
# "PLC1901", # {existing} can be simplified to {replacement} as an empty string is falsey; too many false positives
# "PLR0911", # Too many return statements ({returns} > {max_returns})
# "PLR0912", # Too many branches ({branches} > {max_branches})
# "PLR0913", # Too many arguments to function call ({c_args} > {max_args})
"PLR0913", # Too many arguments to function call ({c_args} > {max_args})
# "PLR0915", # Too many statements ({statements} > {max_statements})
"PLR2004", # Magic value used in comparison, consider replacing {value} with a constant variable
# "PLW2901", # Outer {outer_kind} variable {name} overwritten by inner {inner_kind} target
# "PT004", # Fixture {fixture} does not return anything, add leading underscore
# "PT011", # pytest.raises({exception}) is too broad, set the `match` parameter or use a more specific exception
# "PT018", # Assertion should be broken down into multiple parts
"PT018", # Assertion should be broken down into multiple parts
# "RUF001", # String contains ambiguous unicode character.
# "RUF002", # Docstring contains ambiguous unicode character.
# "RUF003", # Comment contains ambiguous unicode character.
@@ -136,14 +162,12 @@ ignore = [
# "SIM115", # Use context handler for opening files
# Moving imports into type-checking blocks can mess with pytest.patch()
"TCH001", # Move application import {} into a type-checking block
"TCH002", # Move third-party import {} into a type-checking block
"TCH003", # Move standard library import {} into a type-checking block
"TC001", # Move application import {} into a type-checking block
"TC002", # Move third-party import {} into a type-checking block
"TC003", # Move standard library import {} into a type-checking block
"TRY003", # Avoid specifying long messages outside the exception class
"TRY400", # Use `logging.exception` instead of `logging.error`
# Ignored due to performance: https://github.com/charliermarsh/ruff/issues/2923
"UP038", # Use `X | Y` in `isinstance` call instead of `(X, Y)`
# May conflict with the formatter, https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
"W191",
@@ -161,3 +185,14 @@ ignore = [
"PLE0605"
]
[tool.ruff.lint.isort]
force-sort-within-sections = true
known-first-party = [
"homeassistant",
]
combine-as-imports = true
split-on-trailing-comma = false
[tool.ruff.lint.per-file-ignores]
"scripts/*" = ["T201"]

View File

@@ -1,4 +1,6 @@
homeassistant==2024.11.0
homeassistant==2025.11.0
pip>=21.3.1
ruff==0.6.8
ruff>=0.9.1,<=0.15.0
types-requests
mypy~=1.19.1
codespell

View File

@@ -2,9 +2,9 @@
wheel
home-assistant-bluetooth
habluetooth>=3.6.0
habluetooth>=5.3.0
bluetooth-adapters
pytest>=8.3.3
pytest>=8.4.1
pytest-cov>=5.0.0
pytest-socket>=0.7.0
pytest-asyncio>=0.24.0
@@ -16,10 +16,10 @@ aiohttp
aiohttp_cors
aiohttp-fast-url-dispatcher
aiohttp-zlib-ng
bleak>=0.22.3
bleak-retry-connector>=3.6.0
bleak>=1.0.1
bleak-retry-connector>=4.4.3
bluetooth-data-tools
pyserial-asyncio
pyudev
pytest-homeassistant-custom-component==0.13.181
pytest-homeassistant-custom-component==0.13.294

1
scripts/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Script to extract PowerView homekey from a G3 PowerView Gateway."""

View File

@@ -0,0 +1,119 @@
"""Extract PowerView homekey from a G3 PowerView Gateway."""
import base64
import json
import struct
from typing import Any, Final
import requests
HUB: Final[str] = "http://powerview-g3.local"
TIMEOUT: Final[int] = 10
def create_request(sid: int, cid: int, sequence_id: int, data: bytes) -> bytes:
"""Assemble a request frame for the PowerView protocol."""
return struct.pack("<BBBB", sid, cid, sequence_id, len(data)) + data
def decode_response(packet: bytes) -> dict[str, Any]:
"""Decode a response frame from the PowerView protocol."""
if len(packet) < 4:
raise ValueError("Packet size too small")
sid, cid, sequence_id, length = struct.unpack("<BBBB", packet[0:4])
if len(packet) != 4 + length:
raise ValueError("Not all data present")
if length < 1:
raise ValueError("No errorCode present")
(error_code,) = struct.unpack("<B", packet[4:5])
data: Final[bytes] = packet[5:]
return {
"cid": cid,
"sid": sid,
"sequenceId": sequence_id,
"errorCode": error_code,
"data": data,
}
def create_get_shade_key_request(sequence_id) -> bytes:
"""Create a GetShadeKey request frame."""
return create_request(251, 18, sequence_id, b"")
def get_shade_key(hub: str, ble_name) -> bytes:
"""Get the homekey for a shade."""
try:
shades_exec_resp: requests.Response = requests.post(
hub + "/home/shades/exec?shades=" + ble_name,
json={"hex": create_get_shade_key_request(1).hex()},
timeout=TIMEOUT,
)
shades_exec_resp.raise_for_status()
except requests.exceptions.RequestException as ex:
print(f"Unable to send GetShadeKey {ex!s}")
raise
result: dict = json.loads(shades_exec_resp.content)
responses = result.get("responses", [])
if len(responses) != 1 or "hex" not in responses[0]:
raise OSError(f"Error when attempting GetShadeKey: {result}")
response: Final[bytes] = bytes.fromhex(responses[0]["hex"])
dec_resp: Final[dict[str, Any]] = decode_response(response)
if dec_resp["errorCode"] != 0:
raise ValueError(
f"BLE errorCode={dec_resp['errorCode']} data={dec_resp['data'].hex()}"
)
if len(dec_resp["data"]) != 16:
raise ValueError("Expected 16 byte homekey")
return dec_resp["data"]
def main(hub: str) -> int:
"""Extract the homekeys from all shades."""
try:
shades_resp: requests.Response = requests.get(
hub + "/home/shades", timeout=TIMEOUT
)
shades_resp.raise_for_status()
except requests.exceptions.RequestException as ex:
print(f"Unable to get list of shades:\n\t{ex!s}")
return -1
shades = json.loads(shades_resp.content)
print(f"Found {len(shades)} shades, interrogating")
network_key: bytes | None = None
for shade in shades:
name: str = base64.b64decode(shade["name"]).decode("utf-8")
try:
key: bytes = get_shade_key(hub, shade["bleName"])
network_key = key
except (OSError, ValueError) as ex:
if network_key is not None:
key = network_key
print(f"Shade '{name}':")
print(f"\tBLE name: '{shade['bleName']}'")
print(f"\tHomeKey: {key.hex()} (shade unreachable, using network key)")
else:
print(f"Shade '{name}':")
print(f"\tBLE name: '{shade['bleName']}'")
print(f"\tHomeKey: ERROR - {ex}")
continue
print(f"Shade '{name}':")
print(f"\tBLE name: '{shade['bleName']}'")
print(f"\tHomeKey: {key.hex()}")
return 0
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Extract PowerView homekey from a G3 PowerView Gateway"
)
parser.add_argument("hub", nargs="?", help="URL to HUB", default=HUB)
args = parser.parse_args()
sys.exit(main(**vars(args)))