11 Commits

Author SHA1 Message Date
patman15
49bb1e79c9 add tilt functions 2025-01-07 09:50:41 +01:00
patman15
d73bb1d1c4 completed tilt control 2025-01-06 19:46:52 +01:00
patman15
e00e04159a support setting tilt 2025-01-05 19:31:26 +01:00
patman15
ea33e08da0 extend set_position() for all values 2025-01-05 12:12:16 +01:00
patman15
8c5c3d3c9a Update .gitignore 2024-12-29 13:00:15 +01:00
patman15
0299a4ed24 Merge branch 'main' into feature/type_8 2024-12-29 12:49:44 +01:00
patman15
21d31ba38a Merge branch 'main' into feature/type_8 2024-12-22 13:36:41 +01:00
patman15
b6b7e43b02 stronger typing 2024-12-22 13:35:29 +01:00
patman15
b550f98e2b Merge branch 'main' into feature/type_8 2024-12-21 09:10:50 +01:00
patman15
f6460280db Merge branch 'main' into feature/type_8 2024-12-20 21:16:37 +01:00
patman15
65b7b3814b disabled home_id filter 2024-12-20 20:41:01 +01:00
28 changed files with 356 additions and 1330 deletions

View File

@@ -8,8 +8,8 @@ on:
- cron: '0 5 * * 6'
jobs:
lint:
name: "Lint the code"
ruff:
name: "Ruff"
runs-on: "ubuntu-latest"
steps:
- name: "Checkout the repository"
@@ -18,17 +18,11 @@ jobs:
- name: "Set up Python"
uses: actions/setup-python@main
with:
python-version: "3.13.2"
python-version: "3.12"
cache: "pip"
- name: "Install requirements"
run: python3 -m pip install -r requirements.txt
- name: "Run ruff"
run: ruff check .
- name: "Run mypy"
run: mypy .
- name: "Run codespell"
run: codespell -L hass
- name: "Run Ruff"
run: python3 -m ruff check .

3
.gitignore vendored
View File

@@ -152,3 +152,6 @@ cython_debug/
#.idea/
aes.py
*.bak
emu/PV_BLE_cover/PV_BLE_cover.ino
img/
*.zip

View File

@@ -8,7 +8,8 @@
> [!WARNING]
> - This integration is under development!
> - Test coverage is low, malfunction might occur.
> - The HOME_KEY is lost over updates!
> - Only devices that are **not** added to the app are controlable. It is possible to add them to the app if you just want to monitor the status (position, battery) in Home Assistant.
> - Currently only position change is supported (e.g., no tilt)
## Features
- Zero configuration
@@ -17,7 +18,7 @@
### Supported Devices
Type* | Description
-- | --
-- | --
1 | Designer Roller
4 | Roman
5 | Bottom Up
@@ -25,7 +26,6 @@ Type* | Description
10 | Duette and Applause SkyLift
19 | Provenance Woven Wood
31, 32, 84 | Vignette
39 | Parkland
42 | M25T Roller Blind
49 | AC Roller
52 | Banded Shades
@@ -39,14 +39,10 @@ The integration provides the following information about the battery
Platform | Description | Unit | Details
-- | -- | -- | --
`binary_sensor` | battery charging indicator | `bool` | true if battery is charging
`button` | identify shade | - | identify shade by LED and 3 beeps
`cover` | view/control position | `%` | percentage cover is open (100% is open)
`sensor` | SoC (state of charge) | `%` | range 100% (full), 50%, 20%, 0% (battery empty)
## Installation
> [!IMPORTANT]
> In case you added your shades to the app or a gateway, you need to [set the encryption key](#set-the-encryption-key) manually in the [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py) file after **each** update!
### Automatic
Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom repository](https://hacs.xyz/docs/faq/custom_repositories/).
@@ -61,22 +57,12 @@ Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom rep
1. Restart Home Assistant
1. In the HA UI go to "Configuration" -> "Integrations" click "+" and search for "Hunter Douglas PowerView (BLE)"
## Set the Encryption Key
Currently, there are three methods to obtain the key:
1. Via adopting a BLE shade: There is a [shade emulator](/emu/PV_BLE_cover) that works with Arduino IDE and an ESP32 device (≥ 2MiB flash, ≥ 128KiB required), e.g. [Adafruit QT Py ESP32-S3](https://www.adafruit.com/product/5426). Install and connect via serial port, then go to the PowerView app and add the shade `myPVcover` to your home. You will see a log message `set shade key: \xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx` . Copy this key. You can delete the shade from the app when done.
2. Extracting from gateway: This [script](scripts/extract_gateway3_homekey.py) is able to extract the key from a working PowerView gateway.
3. Grabbing from the app: Checkout this [post in the Home Assistant community forum](https://community.home-assistant.io/t/hunter-douglas-powerview-gen-3-integration/424836/228).
Finally, you need to manually copy the key to [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py).
> [!IMPORTANT]
> You need to update the file after **each** update!
## Known Issues
<details><summary>Shade inoperable after charging</summary>
It seems that the shades require some re-initialization after charging. The solution is currently unknown, but as a workaround you can operate the shade ones using the vendor app.
</details>
## Outlook
- Add support for encryption
- Allow parallel usage to PowerView app as "remote"
- Add support for tilt function
- Add support for further device types
## Troubleshooting
In case you have severe troubles,
@@ -86,15 +72,6 @@ In case you have severe troubles,
- disable the log (Home Assistant will prompt you to download the log), and finally
- [open an issue](https://github.com/patman15/hdpv_ble/issues/new?assignees=&labels=Bug&projects=&template=bug.yml) with a good description of what happened and attach the log.
# Thanks To
[@mannkind](https://github.com/mannkind)
[license-shield]: https://img.shields.io/github/license/patman15/hdpv_ble.svg?style=for-the-badge
[releases-shield]: https://img.shields.io/github/release/patman15/hdpv_ble.svg?style=for-the-badge
[releases]: https://github.com//patman15/hdpv_ble/releases
## Outlook
- Add tests!
- Allow parallel usage to PowerView app as "remote"
- Add support for tilt function
- Add support for further device types

View File

@@ -4,194 +4,48 @@
@license: Apache-2.0 license
"""
import base64
from collections.abc import Callable
import aiohttp
from bleak.backends.device import BLEDevice
from bleak.exc import BleakError
from homeassistant.components import bluetooth
from homeassistant.components.bluetooth import (
BluetoothCallbackMatcher,
BluetoothScanningMode,
BluetoothServiceInfoBleak,
async_ble_device_from_address,
async_discovered_service_info,
)
from homeassistant.components.bluetooth import async_ble_device_from_address
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
from .api import UUID_COV_SERVICE as UUID
from .const import CONF_HUB_URL, LOGGER, MFCT_ID, SIGNAL_NEW_SHADE
from .const import DOMAIN, LOGGER
from .coordinator import PVCoordinator
PLATFORMS: list[Platform] = [
Platform.BINARY_SENSOR,
Platform.BUTTON,
Platform.COVER,
Platform.NUMBER,
Platform.SENSOR,
]
PLATFORMS: list[Platform] = [Platform.BINARY_SENSOR, Platform.COVER, Platform.SENSOR]
type HubRuntimeData = dict[str, PVCoordinator]
type ConfigEntryType = ConfigEntry[HubRuntimeData]
type AddEntitiesFn = Callable[[PVCoordinator, AddEntitiesCallback], None]
def async_setup_shade_platform(
hass: HomeAssistant,
config_entry: ConfigEntryType,
async_add_entities: AddEntitiesCallback,
add_fn: AddEntitiesFn,
) -> None:
"""Set up a platform for all current and future shades."""
for coordinator in config_entry.runtime_data.values():
add_fn(coordinator, async_add_entities)
@callback
def _async_new_shade(coordinator: PVCoordinator) -> None:
add_fn(coordinator, async_add_entities)
config_entry.async_on_unload(
async_dispatcher_connect(
hass,
SIGNAL_NEW_SHADE.format(entry_id=config_entry.entry_id),
_async_new_shade,
)
)
async def _fetch_shade_names(
hass: HomeAssistant, hub_url: str
) -> dict[str, str]:
"""Fetch BLE name -> friendly name mapping from the hub.
Returns empty dict on failure.
"""
session = async_get_clientsession(hass)
timeout = aiohttp.ClientTimeout(total=10)
try:
async with session.get(f"{hub_url}/home/shades", timeout=timeout) as resp:
resp.raise_for_status()
shades = await resp.json(content_type=None)
except (TimeoutError, aiohttp.ClientError, ValueError):
return {}
names: dict[str, str] = {}
for shade in shades or []:
ble_name = shade.get("bleName", "")
if not ble_name:
continue
name_b64 = shade.get("name", "")
try:
name = base64.b64decode(name_b64).decode("utf-8") if name_b64 else ble_name
except Exception: # noqa: BLE001
name = ble_name
names[ble_name] = name
return names
async def _async_setup_shade(
hass: HomeAssistant,
entry: ConfigEntryType,
service_info: BluetoothServiceInfoBleak,
shade_names: dict[str, str],
) -> None:
"""Create a coordinator for a newly discovered shade."""
address = service_info.address
if address in entry.runtime_data:
return
ble_device: BLEDevice | None = async_ble_device_from_address(
hass=hass, address=address, connectable=True
)
if not ble_device:
LOGGER.debug("BLE device %s not connectable, skipping", address)
return
friendly_name = shade_names.get(service_info.name, service_info.name)
coordinator = PVCoordinator(
hass, ble_device, entry.data.copy(), friendly_name
)
entry.runtime_data[address] = coordinator
entry.async_on_unload(coordinator.async_start())
async_dispatcher_send(
hass,
SIGNAL_NEW_SHADE.format(entry_id=entry.entry_id),
coordinator,
)
# Query device info in background — don't block entry setup
try:
await coordinator.query_dev_info()
except BleakError:
LOGGER.warning(
"Could not query device info for %s (%s)",
friendly_name,
address,
)
type ConfigEntryType = ConfigEntry[PVCoordinator]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntryType) -> bool:
"""Set up PowerView Home from a config entry."""
"""Set up BT Battery Management System from a config entry."""
LOGGER.debug("Setup of %s", repr(entry))
entry.runtime_data = {}
if entry.unique_id is None:
raise ConfigEntryError("Missing unique ID for device.")
# Resolve shade friendly names from hub if available
hub_url = entry.data.get(CONF_HUB_URL, "")
shade_names: dict[str, str] = {}
if hub_url:
shade_names = await _fetch_shade_names(hass, hub_url)
# Forward platforms first so dispatched entities have their setup ready
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
# Kick off shade setup for already-discovered BLE devices (non-blocking)
for service_info in async_discovered_service_info(hass, connectable=True):
if (
MFCT_ID in service_info.manufacturer_data
and UUID in service_info.service_uuids
):
hass.async_create_task(
_async_setup_shade(hass, entry, service_info, shade_names)
)
# Register for future BLE discoveries
def _async_discovered_device(
service_info: BluetoothServiceInfoBleak,
change: bluetooth.BluetoothChange,
) -> None:
if service_info.address not in entry.runtime_data:
hass.async_create_task(
_async_setup_shade(hass, entry, service_info, shade_names)
)
entry.async_on_unload(
bluetooth.async_register_callback(
hass,
_async_discovered_device,
BluetoothCallbackMatcher(
service_uuid=UUID,
manufacturer_id=MFCT_ID,
),
BluetoothScanningMode.ACTIVE,
)
ble_device = async_ble_device_from_address(
hass=hass, address=entry.unique_id, connectable=True
)
if not ble_device:
raise ConfigEntryNotReady(
f"Could not find PowerView device ({entry.unique_id}) via Bluetooth"
)
coordinator = PVCoordinator(hass, ble_device, entry.data.copy())
try:
await coordinator.query_dev_info()
except BleakError as err:
raise ConfigEntryNotReady("Unable to query device info.") from err
# Insert the coordinator in the global registry
hass.data.setdefault(DOMAIN, {})
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(coordinator.async_start())
return True
@@ -199,8 +53,20 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntryType) -> boo
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
entry.runtime_data.clear()
LOGGER.debug("Unloaded config entry: %s, ok? %s!", entry.unique_id, str(unload_ok))
return unload_ok
async def async_migrate_entry(
_hass: HomeAssistant, config_entry: ConfigEntryType
) -> bool:
"""Migrate old entry."""
if config_entry.version > 1:
# This means the user has downgraded from a future version
LOGGER.debug("Cannot downgrade from version %s", config_entry.version)
return False
LOGGER.debug("Migrating from version %s", config_entry.version)
return False

View File

@@ -1,10 +1,10 @@
"""Hunter Douglas PowerView BLE API."""
import asyncio
import time
from dataclasses import dataclass
from enum import Enum
import time
from typing import Final, NamedTuple
from typing import Final
from bleak import BleakClient
from bleak.backends.device import BLEDevice
@@ -16,7 +16,6 @@ from cryptography.hazmat.primitives.ciphers.base import (
AEADDecryptionContext,
AEADEncryptionContext,
)
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
ATTR_CURRENT_TILT_POSITION,
@@ -40,88 +39,23 @@ SHADE_TYPE: Final[dict[int, str]] = {
6: "Duette",
10: "Duette and Applause SkyLift",
19: "Provenance Woven Wood",
26: "Vertical",
31: "Vignette",
32: "Vignette",
42: "M25T Roller Blind",
49: "AC Roller",
52: "Banded Shades",
53: "Sonnette",
57: "Carole Roman Shades",
84: "Vignette",
# top down (single rail, inverted position)
7: "Top Down",
# top down bottom up (dual rail)
# top down bottom up
8: "Duette, Top Down Bottom Up",
9: "Duette DuoLite, Top Down Bottom Up",
33: "Duette Architella, Top Down Bottom Up",
47: "Pleated, Top Down Bottom Up",
# tilt only (no position movement)
39: "Parkland",
40: "Everwood Alternative Wood Blinds",
# tilt on closed
18: "Bottom Up, Tilt on Closed 90°",
44: "Twist",
# tilt anywhere (position + tilt)
# top down, tilt anywhere
51: "Venetian, Tilt Anywhere",
54: "Vertical Slats, Left Stack",
55: "Vertical Slats, Right Stack",
56: "Vertical Slats, Split Stack",
62: "Venetian, Tilt Anywhere",
# duolite (dual overlapping fabrics)
38: "Dual Overlapped, Tilt 90°",
65: "Dual Overlapped",
95: "Dual Overlapped Illuminated",
}
class ShadeCapability(NamedTuple):
"""Capability flags for a shade type."""
has_tilt: bool = False
tilt_only: bool = False
is_tilt_on_closed: bool = False # tilt only available when fully closed
is_top_down: bool = False # position logic is inverted (SkyLift style)
is_tdbu: bool = False # dual-rail Top Down Bottom Up (needs two entities)
is_duolite: bool = False # dual-fabric sheer+opaque (needs three entities)
SHADE_CAPABILITIES: Final[dict[int, ShadeCapability]] = {
# tilt anywhere (position + tilt)
51: ShadeCapability(has_tilt=True),
54: ShadeCapability(has_tilt=True),
55: ShadeCapability(has_tilt=True),
56: ShadeCapability(has_tilt=True),
62: ShadeCapability(has_tilt=True),
# tilt only (no position movement)
39: ShadeCapability(has_tilt=True, tilt_only=True),
40: ShadeCapability(has_tilt=True, tilt_only=True),
# tilt on closed (tilt only available at fully closed position)
18: ShadeCapability(has_tilt=True, is_tilt_on_closed=True),
44: ShadeCapability(has_tilt=True, is_tilt_on_closed=True),
# top-down only (single rail, inverted position)
7: ShadeCapability(is_top_down=True),
10: ShadeCapability(is_top_down=True),
# dual-rail top-down/bottom-up (two independent rails → two entities)
8: ShadeCapability(is_tdbu=True),
33: ShadeCapability(is_tdbu=True),
47: ShadeCapability(is_tdbu=True),
# duolite (dual overlapping fabrics → three entities)
9: ShadeCapability(is_tdbu=True, is_duolite=True),
38: ShadeCapability(is_duolite=True),
65: ShadeCapability(is_duolite=True),
95: ShadeCapability(is_duolite=True),
}
_DEFAULT_CAPABILITY: Final[ShadeCapability] = ShadeCapability()
def get_shade_capabilities(type_id: int | None) -> ShadeCapability:
"""Return shade capabilities for a given type_id."""
if type_id is None:
return _DEFAULT_CAPABILITY
return SHADE_CAPABILITIES.get(type_id, _DEFAULT_CAPABILITY)
OPEN_POSITION: Final[int] = 100
CLOSED_POSITION: Final[int] = 0
@@ -161,18 +95,26 @@ class PowerViewBLE:
def __init__(self, ble_device: BLEDevice, home_key: bytes = b"") -> None:
"""Initialize device API via Bluetooth."""
self._ble_device: BLEDevice = ble_device
self._ble_device: Final[BLEDevice] = ble_device
self.name: Final[str] = self._ble_device.name or "unknown"
self._seqcnt: int = 1
self._client: BleakClient = BleakClient(self._ble_device)
self._client: BleakClient = BleakClient(
self._ble_device,
disconnected_callback=self._on_disconnect,
services=[
UUID_COV_SERVICE,
# self.UUID_DEV_SERVICE,
# self.UUID_BAT_SERVICE,
],
)
self._data_event = asyncio.Event()
self._data: bytes = b""
self._data: bytearray = bytearray()
self._info: PVDeviceInfo = PVDeviceInfo()
self._is_encrypted: bool = False
self._cmd_lock: Final = asyncio.Lock()
self._cmd_next: tuple[ShadeCmd, bytes]
self._is_encrypted: bool = False
self._cipher: Final[Cipher | None] = (
Cipher(algorithms.AES(home_key), modes.CTR(bytes(16)))
Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16)))
if len(home_key) == 16
else None
)
@@ -181,10 +123,6 @@ class PowerViewBLE:
await self._data_event.wait()
self._data_event.clear()
def set_ble_device(self, ble_device: BLEDevice) -> None:
"""Update the BLE device reference (e.g. when proxy details change)."""
self._ble_device = ble_device
@property
def encrypted(self) -> bool:
"""Return whether communication with this shade is encrypted."""
@@ -194,11 +132,6 @@ class PowerViewBLE:
def encrypted(self, value: bool) -> None:
self._is_encrypted = value
@property
def has_key(self) -> bool:
"""Return True if a valid homekey was provided."""
return self._cipher is not None
@property
def info(self) -> PVDeviceInfo:
"""Return device information, e.g. SW version."""
@@ -247,81 +180,72 @@ class PowerViewBLE:
raise
@staticmethod
def dec_manufacturer_data(data: bytearray) -> dict[str, float | int | bool]:
def dec_manufacturer_data(data: bytearray) -> list[tuple[str, float]]:
"""Decode manufacturer data from BLE advertisement V2."""
if len(data) != 9:
LOGGER.debug("not a V2 record!")
return {}
# data[3] lower 2 bits are status flags; pos is in bits 2-7 of data[3]
# and bits 0-3 of data[4]. Read flags before extracting position so
# the masking below doesn't accidentally overwrite them.
flags: Final[int] = data[3] & 0x3
# Mask pos2 bits (upper nibble of data[4]) out before forming the
# 10-bit position value, otherwise a non-zero top-rail position on
# TDBU shades contaminates the bottom-rail reading.
pos: Final[int] = ((data[4] & 0x0F) << 6) | ((data[3] >> 2) & 0x3F)
return []
pos: Final[int] = int.from_bytes(data[3:5], byteorder="little")
pos2: Final[int] = (int(data[5]) << 4) + (int(data[4]) >> 4)
return {
ATTR_CURRENT_POSITION: pos / 10,
"position2": pos2 >> 2,
"position3": int(data[6]),
ATTR_CURRENT_TILT_POSITION: int(data[7]),
"home_id": int.from_bytes(data[0:2], byteorder="little"),
"type_id": int(data[2]),
"is_opening": bool(flags == 0x2),
"is_closing": bool(flags == 0x1),
"battery_charging": bool(flags == 0x3), # observed
"battery_level": POWER_LEVELS[(data[8] >> 6)], # cannot hit 4
"resetMode": bool(data[8] & 0x1),
"resetClock": bool(data[8] & 0x2),
}
return [
(ATTR_CURRENT_POSITION, ((pos >> 2) / 10)),
("position2", pos2 >> 2),
("position3", int(data[6])),
(ATTR_CURRENT_TILT_POSITION, int(data[7])),
("home_id", int.from_bytes(data[0:2], byteorder="little")),
("type_id", int.from_bytes(data[2:3])),
("is_opening", bool(pos & 0x3 == 0x2)),
("is_closing", bool(pos & 0x3 == 0x1)),
("battery_charging", bool(pos & 0x3 == 0x3)), # observed
("battery_level", POWER_LEVELS[(data[8] >> 6)]), # cannot hit 4
("resetMode", bool(data[8] & 0x1)),
("resetClock", bool(data[8] & 0x2)),
]
# position cmd: uint16_t pos1, uint16_t pos2, uint16_t pos3, uint16_t tilt, uint8_t velocity
async def set_position(
self,
pos1: int,
pos2: int = 0x8000,
pos3: int = 0x8000,
tilt: int = 0x8000,
pos2: int | None = None,
pos3: int | None = None,
tilt: int | None = None,
velocity: int = 0x0,
disconnect: bool = True,
) -> None:
"""Set position of device."""
LOGGER.debug(
"%s setting position to %i/%i/%i, tilt %i, velocity %s",
self.name,
pos1,
pos2,
pos3,
tilt,
velocity,
)
LOGGER.debug("%s setting position to %i, tilt %i", self.name, pos1, tilt)
await self._cmd(
(
ShadeCmd.SET_POSITION,
int.to_bytes(pos1 * 100, 2, byteorder="little")
+ int.to_bytes(pos2, 2, byteorder="little")
+ int.to_bytes(pos3, 2, byteorder="little")
+ int.to_bytes(tilt, 2, byteorder="little")
int.to_bytes(pos1, 2, byteorder="little")
+ int.to_bytes(
pos2 if pos2 is not None else 0x8000, 2, byteorder="little"
)
+ int.to_bytes(
pos3 if pos3 is not None else 0x8000, 2, byteorder="little"
)
+ int.to_bytes(
tilt if tilt is not None else 0x8000, 2, byteorder="little"
)
+ int.to_bytes(velocity, 1),
),
disconnect,
)
async def open(self, velocity: int = 0x0) -> None:
async def open(self) -> None:
"""Fully open cover."""
LOGGER.debug("%s open", self.name)
await self.set_position(OPEN_POSITION, velocity=velocity, disconnect=False)
await self.set_position(OPEN_POSITION, disconnect=False)
async def stop(self) -> None:
"""Stop device movement."""
LOGGER.debug("%s stop", self.name)
await self._cmd((ShadeCmd.STOP, b""))
async def close(self, velocity: int = 0x0) -> None:
async def close(self) -> None:
"""Fully close cover."""
LOGGER.debug("%s close", self.name)
await self.set_position(CLOSED_POSITION, velocity=velocity, disconnect=False)
await self.set_position(CLOSED_POSITION, disconnect=False)
# uint8_t scene#, uint8_t unknown
# open: scene 2
@@ -336,15 +260,14 @@ class PowerViewBLE:
),
)
async def identify(self, beeps: int = 0x3) -> None:
"""Identify device."""
LOGGER.debug("%s identify (%i)", self.name, beeps)
await self._cmd((ShadeCmd.IDENTIFY, bytes([min(beeps, 0xFF)])))
def _verify_response(self, data: bytes, seq_nr: int, cmd: ShadeCmd) -> bool:
def _verify_response(self, din: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
"""Verify shade response data."""
data: bytearray = din
if self._cipher is not None and self._is_encrypted:
dec: AEADDecryptionContext = self._cipher.decryptor()
data = bytearray(dec.update(din) + dec.finalize())
if len(data) < 4:
LOGGER.error("Response message too short")
LOGGER.error("Reponse message too short")
return False
if int.from_bytes(data[0:2], byteorder="little") != cmd.value & 0xFFEF:
LOGGER.warning("Response to wrong command")
@@ -400,10 +323,10 @@ class PowerViewBLE:
def _notification_handler(self, _sender, data: bytearray) -> None:
LOGGER.debug("%s received BLE data: %s", self.name, data.hex(" "))
self._data = bytes(data)
self._data = data
if self._cipher is not None and self._is_encrypted:
dec: AEADDecryptionContext = self._cipher.decryptor()
self._data = bytes(dec.update(bytes(data)) + dec.finalize())
self._data = bytearray(dec.update(data) + dec.finalize())
LOGGER.debug(
"%s %s",
"decoded data: ".rjust(19 + len(self.name)),
@@ -427,7 +350,6 @@ class PowerViewBLE:
self._ble_device,
self.name,
disconnected_callback=self._on_disconnect,
ble_device_callback=lambda: self._ble_device,
services=[
UUID_COV_SERVICE,
UUID_DEV_SERVICE,

View File

@@ -13,7 +13,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import ConfigEntryType, async_setup_shade_platform
from . import ConfigEntryType
from .const import DOMAIN
from .coordinator import PVCoordinator
@@ -26,30 +26,21 @@ BINARY_SENSOR_TYPES: list[BinarySensorEntityDescription] = [
]
def _add_entities(
coordinator: PVCoordinator, async_add_entities: AddEntitiesCallback
) -> None:
"""Create binary sensor entities for a single shade coordinator."""
async_add_entities(
[
PVBinarySensor(coordinator, descr, format_mac(coordinator.address))
for descr in BINARY_SENSOR_TYPES
]
)
async def async_setup_entry(
hass: HomeAssistant,
_hass: HomeAssistant,
config_entry: ConfigEntryType,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Add sensors for passed config_entry in Home Assistant."""
async_setup_shade_platform(hass, config_entry, async_add_entities, _add_entities)
coord: PVCoordinator = config_entry.runtime_data
for descr in BINARY_SENSOR_TYPES:
async_add_entities(
[PVBinarySensor(coord, descr, format_mac(config_entry.unique_id))]
)
class PVBinarySensor(
PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySensorEntity
): # type: ignore[reportIncompatibleMethodOverride]
class PVBinarySensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySensorEntity): # type: ignore[reportIncompatibleMethodOverride]
"""The generic PV binary sensor implementation."""
def __init__(
@@ -58,7 +49,7 @@ class PVBinarySensor(
descr: BinarySensorEntityDescription,
unique_id: str,
) -> None:
"""Initialize PV binary sensor."""
"""Intialize PV binary sensor."""
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
self._attr_device_info = coord.device_info
self._attr_has_entity_name = True

View File

@@ -1,71 +0,0 @@
"""Hunter Douglas Powerview cover."""
from typing import Final
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
from homeassistant.components.button import (
ButtonDeviceClass,
ButtonEntity,
ButtonEntityDescription,
)
from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import ConfigEntryType, async_setup_shade_platform
from .const import DOMAIN, LOGGER
from .coordinator import PVCoordinator
BUTTONS_SHADE: Final = [
ButtonEntityDescription(
key="identify",
device_class=ButtonDeviceClass.IDENTIFY,
entity_category=EntityCategory.DIAGNOSTIC,
),
]
def _add_entities(
coordinator: PVCoordinator, async_add_entities: AddEntitiesCallback
) -> None:
"""Create button entities for a single shade coordinator."""
async_add_entities(
[PowerViewButton(coordinator, descr) for descr in BUTTONS_SHADE]
)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntryType,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the button platform."""
async_setup_shade_platform(hass, config_entry, async_add_entities, _add_entities)
class PowerViewButton(PassiveBluetoothCoordinatorEntity[PVCoordinator], ButtonEntity): # type: ignore[reportIncompatibleVariableOverride]
"""Representation of a powerview shade."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: PVCoordinator,
description: ButtonEntityDescription,
) -> None:
"""Initialize the shade."""
self.entity_description = description
self._coord: PVCoordinator = coordinator
self._attr_device_info = self._coord.device_info
self._attr_unique_id = (
f"{DOMAIN}_{format_mac(self._coord.address)}_{ButtonDeviceClass.IDENTIFY}"
)
super().__init__(coordinator)
async def async_press(self) -> None:
"""Handle the button press."""
LOGGER.debug("identify cover")
await self._coord.api.identify()

View File

@@ -1,253 +1,42 @@
"""Config flow for Hunter Douglas PowerView BLE integration."""
"""Config flow for BLE Battery Management System integration."""
import hashlib
import struct
from dataclasses import dataclass
from typing import Any
import aiohttp
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.bluetooth import BluetoothServiceInfoBleak
from homeassistant.components.bluetooth import (
BluetoothServiceInfoBleak,
async_discovered_service_info,
)
from homeassistant.config_entries import ConfigFlowResult
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.selector import (
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from homeassistant.const import CONF_ADDRESS
from .const import CONF_HOME_KEY, CONF_HUB_URL, DOMAIN, LOGGER, MFCT_ID
# from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
def _hub_unique_id(home_key: str) -> str:
"""Derive a stable unique ID for a hub entry from the home key."""
if home_key:
digest = hashlib.sha256(home_key.encode()).hexdigest()[:16]
return f"pvhome_{digest}"
return "pvhome_unencrypted"
def _parse_key_response(ble_name: str, result: dict) -> bytes | None: # noqa: PLR0911
"""Parse a shade exec response and return the 16-byte key, or None."""
if result.get("err"):
err_msg = (result.get("responses") or [{}])[0].get("errMsg", "unknown")
LOGGER.warning(
"Shade %s: hub BLE command failed (err=%s: %s)",
ble_name,
result["err"],
err_msg,
)
return None
responses = result.get("responses", [])
if len(responses) != 1 or "hex" not in responses[0]:
LOGGER.warning(
"Shade %s returned unexpected response structure: %s",
ble_name,
result,
)
return None
response_bytes = bytes.fromhex(responses[0]["hex"])
if len(response_bytes) < 5:
LOGGER.warning(
"Shade %s response too short (%d bytes)", ble_name, len(response_bytes)
)
return None
_s, _c, _q, length = struct.unpack("<BBBB", response_bytes[0:4])
if len(response_bytes) != 4 + length:
LOGGER.warning(
"Shade %s frame length mismatch (header=%d, actual=%d)",
ble_name,
4 + length,
len(response_bytes),
)
return None
if response_bytes[4] != 0:
LOGGER.warning(
"Shade %s returned error status %d", ble_name, response_bytes[4]
)
return None
key_data = response_bytes[5:]
if len(key_data) != 16:
LOGGER.warning(
"Shade %s returned key of wrong length (%d, expected 16)",
ble_name,
len(key_data),
)
return None
return key_data
async def _fetch_key_from_hub(
hass: HomeAssistant, hub_url: str
) -> bytes:
"""Fetch 16-byte homekey from a PowerView G3 hub.
Tries each shade on the hub until one returns a valid key.
The key is network-wide so any reachable shade returns the same value.
The hub must establish a BLE connection to each shade before it can proxy
the key request. On the first pass that connection is often not yet open,
so the hub returns an error immediately. A second pass (after a short
pause to let the hub complete its BLE connections) reliably succeeds.
Raises ValueError on protocol/key errors.
Raises aiohttp.ClientError on network errors.
Raises asyncio.TimeoutError on timeout.
"""
session = async_get_clientsession(hass)
timeout = aiohttp.ClientTimeout(total=10)
async with session.get(f"{hub_url}/home/shades", timeout=timeout) as resp:
resp.raise_for_status()
shades = await resp.json(content_type=None)
if not shades:
raise ValueError("No shades found on the hub")
# Sort by signal strength (strongest first) — a stronger signal means the
# hub is more likely to have an active BLE connection to that shade.
shades.sort(key=lambda s: s.get("signalStrength", -100), reverse=True)
ble_names = [s["bleName"] for s in shades if s.get("bleName")]
if not ble_names:
raise ValueError("No BLE-capable shades found on the hub")
# GetShadeKey BLE request: sid=251, cid=18, seqId=1, data_len=0
request_frame = struct.pack("<BBBB", 251, 18, 1, 0)
last_error: Exception = ValueError("No shades responded")
for ble_name in ble_names:
try:
async with session.post(
f"{hub_url}/home/shades/exec?shades={ble_name}",
json={"hex": request_frame.hex()},
timeout=timeout,
) as resp:
resp.raise_for_status()
result = await resp.json(content_type=None)
except (TimeoutError, aiohttp.ClientError) as ex:
LOGGER.warning("Shade %s unreachable: %s", ble_name, ex)
last_error = ex
continue
key_data = _parse_key_response(ble_name, result)
if key_data is not None:
return key_data
raise ValueError(f"No reachable shade returned a valid key: {last_error}")
_HOMEKEY_SCHEMA = vol.Schema(
{
vol.Required("key_method", default="hub"): SelectSelector(
SelectSelectorConfig(
options=[
SelectOptionDict(
value="hub",
label="Fetch automatically from PowerView hub",
),
SelectOptionDict(
value="manual",
label="Enter key manually (32 hex characters)",
),
SelectOptionDict(
value="skip",
label="Skip (no key — controls disabled for encrypted shades)",
),
]
)
),
vol.Optional(CONF_HUB_URL, default="http://powerview-g3.local"): TextSelector(
TextSelectorConfig(type=TextSelectorType.URL)
),
vol.Optional("home_key", default=""): TextSelector(
TextSelectorConfig(type=TextSelectorType.TEXT)
),
}
)
from .api import UUID_COV_SERVICE as UUID
from .const import DOMAIN, LOGGER, MFCT_ID
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Hunter Douglas PowerView BLE."""
"""Handle a config flow for BT Battery Management System."""
VERSION = 2
MINOR_VERSION = 1
VERSION = 1
MINOR_VERSION = 0
@dataclass
class DiscoveredDevice:
"""A discovered bluetooth device."""
name: str
discovery_info: BluetoothServiceInfoBleak
def __init__(self) -> None:
"""Initialize the config flow."""
self._home_key: str = ""
self._hub_url: str = ""
def _create_entry(self) -> ConfigFlowResult:
"""Create the hub config entry."""
data: dict[str, str] = {CONF_HOME_KEY: self._home_key}
if self._hub_url:
data[CONF_HUB_URL] = self._hub_url
return self.async_create_entry(title="PowerView Home", data=data)
def _validate_manual_key(
self, user_input: dict[str, Any], errors: dict[str, str]
) -> bool:
"""Validate a manually entered hex key.
Returns True on success, False on validation error.
"""
raw = user_input.get("home_key", "").strip()
if "\\x" in raw:
raw = raw.replace("\\x", "")
if len(raw) != 32:
errors["home_key"] = "invalid_key_length"
return False
try:
bytes.fromhex(raw)
except ValueError:
errors["home_key"] = "invalid_key_format"
return False
self._home_key = raw.lower()
return True
async def _validate_homekey_input(
self, user_input: dict[str, Any], errors: dict[str, str]
) -> bool:
"""Parse and validate homekey user_input, populating self state.
Returns True on success, False on validation error (errors dict is populated).
"""
method = user_input.get("key_method", "skip")
if method == "skip":
self._home_key = ""
return True
if method == "manual":
return self._validate_manual_key(user_input, errors)
if method != "hub":
return False
hub_url = user_input.get(CONF_HUB_URL, "").rstrip("/")
_HUB_ERROR_MAP: dict[type[Exception], str] = {
aiohttp.ClientResponseError: "hub_http_error",
aiohttp.ClientConnectionError: "hub_connection_error",
TimeoutError: "hub_timeout",
ValueError: "hub_protocol_error",
}
try:
key = await _fetch_key_from_hub(self.hass, hub_url)
except tuple(_HUB_ERROR_MAP) as ex:
LOGGER.warning("Hub key fetch failed: %s", ex)
errors[CONF_HUB_URL] = _HUB_ERROR_MAP[type(ex)]
return False
self._home_key = key.hex()
self._hub_url = hub_url
return True
self._discovered_device: ConfigFlow.DiscoveredDevice | None = None
self._discovered_devices: dict[str, ConfigFlow.DiscoveredDevice] = {}
async def async_step_bluetooth(
self, discovery_info: BluetoothServiceInfoBleak
@@ -255,59 +44,84 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a flow initialized by Bluetooth discovery."""
LOGGER.debug("Bluetooth device detected: %s", discovery_info)
# Derive a home-wide unique ID from the home_id embedded in the BLE
# advertisement (bytes 0-1 of the manufacturer payload). All shades on
# the same network share the same home_id, so HA deduplicates every
# subsequent shade discovery into this single flow via
# "already_in_progress" rather than spawning one notification per shade.
mfr_data = bytearray(
discovery_info.manufacturer_data.get(MFCT_ID, b"")
)
if len(mfr_data) >= 2:
home_id = int.from_bytes(mfr_data[0:2], byteorder="little")
unique_id = f"pvhome_{home_id}"
else:
unique_id = DOMAIN
await self.async_set_unique_id(unique_id)
await self.async_set_unique_id(discovery_info.address)
self._abort_if_unique_id_configured()
# If a hub entry already exists (unique_id may differ), shades are
# auto-discovered internally — nothing more for the user to do.
for entry in self._async_current_entries():
if entry.version >= 2:
return self.async_abort(reason="already_configured")
self._discovered_device = ConfigFlow.DiscoveredDevice(
discovery_info.name, discovery_info
)
self.context["title_placeholders"] = {"name": self._discovered_device.name}
return await self.async_step_bluetooth_confirm()
# No hub entry yet — redirect to user setup
return await self.async_step_user()
async def async_step_bluetooth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm bluetooth device discovery."""
assert self._discovered_device is not None
LOGGER.debug("confirm step for %s", self._discovered_device.name)
if user_input is not None:
return self.async_create_entry(
title=self._discovered_device.name,
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
)
self._set_confirm_only()
return self.async_show_form(
step_id="bluetooth_confirm",
description_placeholders={"name": self._discovered_device.name},
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the user step — create a hub entry."""
"""Handle the user step to pick discovered device."""
LOGGER.debug("user step")
# Only one hub entry allowed (per key, but for simplicity one total)
for entry in self._async_current_entries():
if entry.version >= 2:
return self.async_abort(reason="single_instance_allowed")
errors: dict[str, str] = {}
if user_input is not None and await self._validate_homekey_input(
user_input, errors
):
unique_id = _hub_unique_id(self._home_key)
await self.async_set_unique_id(unique_id, raise_on_progress=False)
if user_input is not None:
address = user_input[CONF_ADDRESS]
await self.async_set_unique_id(address, raise_on_progress=False)
self._abort_if_unique_id_configured()
return self._create_entry()
self._discovered_device = self._discovered_devices[address]
self.context["title_placeholders"] = {"name": self._discovered_device.name}
return self.async_create_entry(
title=self._discovered_device.name,
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
)
current_addresses = self._async_current_ids()
for discovery_info in async_discovered_service_info(self.hass, False):
address = discovery_info.address
if address in current_addresses or address in self._discovered_devices:
continue
if MFCT_ID not in discovery_info.manufacturer_data:
continue
if UUID not in discovery_info.service_uuids:
continue
self._discovered_devices[address] = ConfigFlow.DiscoveredDevice(
discovery_info.name, discovery_info
)
if not self._discovered_devices:
return self.async_abort(reason="no_devices_found")
titles = []
for address, discovery in self._discovered_devices.items():
titles.append({"value": address, "label": discovery.name})
return self.async_show_form(
step_id="user",
data_schema=_HOMEKEY_SCHEMA,
errors=errors,
description_placeholders={
"hub_url_example": "http://powerview-g3.local",
},
data_schema=vol.Schema(
{
vol.Required(CONF_ADDRESS): SelectSelector(
SelectSelectorConfig(options=titles)
)
}
),
)

View File

@@ -3,16 +3,25 @@
import logging
from typing import Final
# from bleak.uuids import normalize_uuid_str
# from homeassistant.const import ( # noqa: F401
# ATTR_BATTERY_CHARGING,
# ATTR_BATTERY_LEVEL,
# ATTR_TEMPERATURE,
# ATTR_VOLTAGE,
# )
DOMAIN: Final[str] = "hunterdouglas_powerview_ble"
LOGGER: Final = logging.getLogger(__package__)
MFCT_ID: Final[int] = 2073
TIMEOUT: Final[int] = 5
CONF_HOME_KEY: Final[str] = "home_key"
CONF_HUB_URL: Final[str] = "hub_url"
# put the key here, needs to be 16 bytes long, e.g.
# HOME_KEY: Final[bytes] = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
HOME_KEY: Final[bytes] = b""
# dispatcher signal for newly discovered shades (format with entry_id)
SIGNAL_NEW_SHADE: Final[str] = f"{DOMAIN}_new_shade_{{entry_id}}"
# attributes (do not change)
ATTR_RSSI: Final[str] = "rssi"

View File

@@ -3,7 +3,6 @@
from typing import Any
from bleak.backends.device import BLEDevice
from homeassistant.components import bluetooth
from homeassistant.components.bluetooth.const import DOMAIN as BLUETOOTH_DOMAIN
from homeassistant.components.bluetooth.passive_update_coordinator import (
@@ -12,36 +11,27 @@ from homeassistant.components.bluetooth.passive_update_coordinator import (
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import CONNECTION_BLUETOOTH, DeviceInfo
from .api import SHADE_TYPE, PowerViewBLE, ShadeCapability, get_shade_capabilities
from .const import ATTR_RSSI, CONF_HOME_KEY, DOMAIN, LOGGER
from .api import SHADE_TYPE, PowerViewBLE
from .const import ATTR_RSSI, DOMAIN, HOME_KEY, LOGGER
class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
"""Update coordinator for a battery management system."""
def __init__(
self,
hass: HomeAssistant,
ble_device: BLEDevice,
data: dict[str, Any],
friendly_name: str | None = None,
self, hass: HomeAssistant, ble_device: BLEDevice, data: dict[str, Any]
) -> None:
"""Initialize BMS data coordinator."""
assert ble_device.name is not None
self._friendly_name = friendly_name or ble_device.name
home_key_hex: str = data.get(CONF_HOME_KEY, "")
home_key: bytes = (
bytes.fromhex(home_key_hex) if len(home_key_hex) == 32 else b""
)
self.api = PowerViewBLE(ble_device, home_key)
self._mac = ble_device.address
self.api = PowerViewBLE(ble_device, HOME_KEY)
self.data: dict[str, int | float | bool] = {}
self._manuf_dat = data.get("manufacturer_data")
self.dev_details: dict[str, str] = {}
self.velocity: int = 0
LOGGER.debug(
"Initializing coordinator for %s (%s)",
self._friendly_name,
ble_device.name,
ble_device.address,
)
super().__init__(
@@ -51,19 +41,6 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
bluetooth.BluetoothScanningMode.ACTIVE,
)
@property
def type_id(self) -> int | None:
"""Return the shade type ID from manufacturer data or live BLE data."""
if self._manuf_dat:
return int(bytes.fromhex(self._manuf_dat)[2])
live = self.data.get("type_id")
return int(live) if live is not None else None
@property
def shade_capabilities(self) -> ShadeCapability:
"""Return the shade capabilities based on type ID."""
return get_shade_capabilities(self.type_id)
async def query_dev_info(self) -> None:
"""Receive detailed information from device."""
LOGGER.debug("%s: querying device info", self.name)
@@ -72,23 +49,24 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
@property
def device_info(self) -> DeviceInfo:
"""Return detailed device information for GUI."""
LOGGER.debug("%s: device_info, %s", self._friendly_name, self.dev_details)
LOGGER.debug("%s: device_info, %s", self.name, self.dev_details)
return DeviceInfo(
identifiers={
(DOMAIN, self.address),
(DOMAIN, self.name),
(BLUETOOTH_DOMAIN, self.address),
},
connections={(CONNECTION_BLUETOOTH, self.address)},
name=self._friendly_name,
name=self.name,
configuration_url=None,
# properties used in GUI:
manufacturer="Hunter Douglas",
model=(
str(SHADE_TYPE.get(self.type_id, "unknown"))
if self.type_id is not None
str(SHADE_TYPE.get(int(bytes.fromhex(self._manuf_dat)[2]), "unknown"))
if self._manuf_dat
else None
),
model_id=(
str(self.type_id) if self.type_id is not None else None
str(bytes.fromhex(self._manuf_dat)[2]) if self._manuf_dat else None
),
serial_number=self.dev_details.get("serial_nr"),
sw_version=self.dev_details.get("sw_rev"),
@@ -98,11 +76,11 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
@property
def device_present(self) -> bool:
"""Check if a device is present."""
return bluetooth.async_address_present(self.hass, self.address, connectable=True)
return bluetooth.async_address_present(self.hass, self._mac, connectable=True)
def _async_stop(self) -> None:
"""Shutdown coordinator and any connection."""
LOGGER.debug("%s: shutting down BMS device", self.name)
LOGGER.debug("%s: shuting down BMS device", self.name)
self.hass.async_create_task(self.api.disconnect())
super()._async_stop()
@@ -114,19 +92,18 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
) -> None:
"""Handle a Bluetooth event."""
# if not self.dev_details:
# self.hass.async_create_task(self._get_device_info())
LOGGER.debug("BLE event %s: %s", change, service_info.manufacturer_data)
self.api.set_ble_device(service_info.device)
new_data: dict[str, int | float | bool] = {ATTR_RSSI: service_info.rssi}
self.data = {ATTR_RSSI: service_info.rssi}
if change == bluetooth.BluetoothChange.ADVERTISEMENT:
new_data.update(
self.data.update(
self.api.dec_manufacturer_data(
bytearray(service_info.manufacturer_data.get(2073, b""))
)
)
self.api.encrypted = bool(new_data.get("home_id"))
self.api.encrypted = bool(self.data.get("home_id"))
if new_data == self.data:
return
self.data = new_data
LOGGER.debug("data sample %s", self.data)
super()._async_handle_bluetooth_event(service_info, change)

View File

@@ -3,7 +3,6 @@
from typing import Any, Final
from bleak.exc import BleakError
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
@@ -16,43 +15,29 @@ from homeassistant.components.cover import (
CoverEntity,
CoverEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.device_registry import DeviceInfo, format_mac
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import ConfigEntryType, async_setup_shade_platform
from .api import CLOSED_POSITION, OPEN_POSITION
from .const import DOMAIN, LOGGER
from .const import DOMAIN, HOME_KEY, LOGGER
from .coordinator import PVCoordinator
def _add_entities(
coordinator: PVCoordinator, async_add_entities: AddEntitiesCallback
) -> None:
"""Create cover entities for a single shade coordinator."""
caps = coordinator.shade_capabilities
if caps.tilt_only:
entities: list[PowerViewCover] = [PowerViewCoverTiltOnly(coordinator)]
elif caps.is_tilt_on_closed:
entities = [PowerViewCoverTiltOnClosed(coordinator)]
elif caps.has_tilt:
entities = [PowerViewCoverTilt(coordinator)]
elif caps.is_top_down:
entities = [PowerViewCoverTopDown(coordinator)]
else:
entities = [PowerViewCover(coordinator)]
async_add_entities(entities)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntryType,
_hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the cover platform."""
async_setup_shade_platform(hass, config_entry, async_add_entities, _add_entities)
"""Set up the demo cover platform."""
coordinator: PVCoordinator = config_entry.runtime_data
async_add_entities(
[PowerViewCoverTilt(coordinator)]
if coordinator.dev_details.get("model") in ["51", "62"]
else [PowerViewCover(coordinator)]
)
class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEntity): # type: ignore[reportIncompatibleVariableOverride]
@@ -73,7 +58,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
) -> None:
"""Initialize the shade."""
LOGGER.debug("%s: init() PowerViewCover", coordinator.name)
self._attr_name = None
self._attr_name = CoverDeviceClass.SHADE
self._coord: PVCoordinator = coordinator
self._attr_device_info = self._coord.device_info
self._target_position: int | None = round(
@@ -84,6 +69,11 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
)
super().__init__(coordinator)
@property
def device_info(self) -> DeviceInfo: # type: ignore[reportIncompatibleVariableOverride]
"""Return the device_info of the device."""
return self._coord.device_info
@property
def is_opening(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is opening or not."""
@@ -112,7 +102,9 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
@property
def supported_features(self) -> CoverEntityFeature: # type: ignore[reportIncompatibleVariableOverride]
"""Flag supported features, disable control if encryption is needed."""
if self._coord.data.get("home_id") and not self._coord.api.has_key:
if (
self._coord.data.get("home_id") and len(HOME_KEY) != 16
) or self._coord.data.get("battery_charging"):
return CoverEntityFeature(0)
return super().supported_features
@@ -137,10 +129,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
return
self._target_position = round(target_position)
try:
await self._coord.api.set_position(
round(target_position),
velocity=self._coord.velocity,
)
await self._coord.api.set_position(round(target_position))
self.async_write_ha_state()
except BleakError as err:
LOGGER.error(
@@ -160,7 +149,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
return
try:
self._target_position = OPEN_POSITION
await self._coord.api.open(velocity=self._coord.velocity)
await self._coord.api.open()
self.async_write_ha_state()
except BleakError as err:
LOGGER.error("Failed to open cover '%s': %s", self.name, err)
@@ -173,7 +162,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
return
try:
self._target_position = CLOSED_POSITION
await self._coord.api.close(velocity=self._coord.velocity)
await self._coord.api.close()
self.async_write_ha_state()
except BleakError as err:
LOGGER.error("Failed to close cover '%s': %s", self.name, err)
@@ -208,7 +197,6 @@ class PowerViewCoverTilt(PowerViewCover):
self,
coordinator: PVCoordinator,
) -> None:
"""Initialize the shade with tilt."""
LOGGER.debug("%s: init() PowerViewCoverTilt", coordinator.name)
super().__init__(coordinator)
@@ -234,9 +222,7 @@ class PowerViewCoverTilt(PowerViewCover):
try:
await self._coord.api.set_position(
self.current_cover_position,
tilt=target_position,
velocity=self._coord.velocity,
self.current_cover_position, tilt=target_position
)
self.async_write_ha_state()
except BleakError as err:
@@ -249,7 +235,7 @@ class PowerViewCoverTilt(PowerViewCover):
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
"""Stop the cover."""
await self.async_stop_cover(**kwargs)
await self.async_stop_cover(kwargs=kwargs)
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
@@ -262,140 +248,3 @@ class PowerViewCoverTilt(PowerViewCover):
LOGGER.debug("close cover tilt")
_kwargs = {**kwargs, ATTR_TILT_POSITION: CLOSED_POSITION}
await self.async_set_cover_tilt_position(**_kwargs)
class PowerViewCoverTiltOnClosed(PowerViewCoverTilt):
"""Representation of a PowerView shade whose tilt is only available when closed.
Examples: Bottom Up 90° (type 18), Twist (type 44).
If a tilt command arrives while the shade is open, the shade is closed first
so the tilt mechanism is engaged before the command is sent.
"""
def __init__(self, coordinator: PVCoordinator) -> None:
"""Initialize the shade."""
LOGGER.debug("%s: init() PowerViewCoverTiltOnClosed", coordinator.name)
super().__init__(coordinator)
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the tilt to a specific position, closing first if needed."""
if self.current_cover_position != CLOSED_POSITION:
LOGGER.debug("tilt-on-closed: closing shade before tilting")
try:
self._target_position = CLOSED_POSITION
await self._coord.api.close(velocity=self._coord.velocity)
self.async_write_ha_state()
except BleakError as err:
LOGGER.error("Failed to close cover '%s' before tilt: %s", self.name, err)
self._reset_target_position()
return
await super().async_set_cover_tilt_position(**kwargs)
class PowerViewCoverTopDown(PowerViewCover):
"""Representation of a top-down PowerView shade.
The device position axis is inverted: device 0 = open (fabric retracted),
device 100 = closed (fabric fully extended). We translate at the boundary
so HA's standard 0=closed / 100=open convention is preserved.
"""
@property
def current_cover_position(self) -> int | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return current position, inverting the device axis."""
pos: Final = self._coord.data.get(ATTR_CURRENT_POSITION)
return OPEN_POSITION - round(pos) if pos is not None else None
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the cover to a specific position, inverting for the device."""
target_position: Final = kwargs.get(ATTR_POSITION)
if target_position is not None:
inverted = OPEN_POSITION - round(target_position)
LOGGER.debug("set top-down cover to position %f (device %i)", target_position, inverted)
if self.current_cover_position == round(target_position) and not (
self.is_closing or self.is_opening
):
return
self._target_position = round(target_position)
try:
await self._coord.api.set_position(
inverted,
velocity=self._coord.velocity,
)
self.async_write_ha_state()
except BleakError as err:
LOGGER.error(
"Failed to move cover '%s' to %f%%: %s",
self.name,
target_position,
err,
)
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open the cover (send device position 0)."""
LOGGER.debug("open top-down cover")
if self.current_cover_position == OPEN_POSITION:
return
try:
self._target_position = OPEN_POSITION
await self._coord.api.set_position(CLOSED_POSITION, velocity=self._coord.velocity)
self.async_write_ha_state()
except BleakError as err:
LOGGER.error("Failed to open cover '%s': %s", self.name, err)
self._reset_target_position()
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close the cover (send device position 100)."""
LOGGER.debug("close top-down cover")
if self.current_cover_position == CLOSED_POSITION:
return
try:
self._target_position = CLOSED_POSITION
await self._coord.api.set_position(OPEN_POSITION, velocity=self._coord.velocity)
self.async_write_ha_state()
except BleakError as err:
LOGGER.error("Failed to close cover '%s': %s", self.name, err)
self._reset_target_position()
class PowerViewCoverTiltOnly(PowerViewCoverTilt):
"""Representation of a PowerView shade with additional tilt functionality."""
OPENCLOSED_THRESHOLD = 5
_attr_device_class = CoverDeviceClass.BLIND
_attr_supported_features = (
CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
| CoverEntityFeature.STOP_TILT
| CoverEntityFeature.SET_TILT_POSITION
)
def __init__(
self,
coordinator: PVCoordinator,
) -> None:
"""Initialize the shade with tilt only."""
LOGGER.debug("%s: init() PowerViewCoverTiltOnly", coordinator.name)
super().__init__(coordinator)
@property
def is_opening(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is opening or not."""
return False
@property
def is_closing(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is closing or not."""
return False
@property
def is_closed(self) -> bool: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is closed."""
return isinstance(self.current_cover_tilt_position, int) and (
self.current_cover_tilt_position
>= OPEN_POSITION - PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
or self.current_cover_tilt_position
<= CLOSED_POSITION + PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
)

View File

@@ -11,10 +11,10 @@
"config_flow": true,
"dependencies": ["bluetooth_adapters"],
"documentation": "https://github.com/patman15/hdpv_ble",
"integration_type": "hub",
"integration_type": "device",
"iot_class": "local_polling",
"issue_tracker": "https://github.com/patman15/hdpv_ble/issues",
"loggers": ["hunterdouglas_powerview_ble"],
"requirements": ["cryptography>=43.0.0"],
"version": "0.24"
"version": "0.22"
}

View File

@@ -1,74 +0,0 @@
"""Hunter Douglas PowerView velocity control."""
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
from homeassistant.components.number import NumberMode, RestoreNumber
from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import ConfigEntryType, async_setup_shade_platform
from .const import DOMAIN, LOGGER
from .coordinator import PVCoordinator
def _add_entities(
coordinator: PVCoordinator, async_add_entities: AddEntitiesCallback
) -> None:
"""Create velocity number entity for a single shade coordinator."""
async_add_entities([PowerViewVelocity(coordinator)])
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntryType,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the velocity number entity."""
async_setup_shade_platform(hass, config_entry, async_add_entities, _add_entities)
class PowerViewVelocity(
PassiveBluetoothCoordinatorEntity[PVCoordinator], RestoreNumber
): # type: ignore[reportIncompatibleVariableOverride]
"""Number entity to control shade movement velocity."""
_attr_has_entity_name = True
_attr_name = "Velocity"
_attr_icon = "mdi:speedometer"
_attr_mode = NumberMode.SLIDER
_attr_native_min_value = 0
_attr_native_max_value = 100
_attr_native_step = 1
_attr_entity_category = EntityCategory.CONFIG
def __init__(self, coordinator: PVCoordinator) -> None:
"""Initialize the velocity entity."""
self._coord = coordinator
self._attr_device_info = self._coord.device_info
self._attr_unique_id = (
f"{DOMAIN}_{format_mac(self._coord.address)}_velocity"
)
super().__init__(coordinator)
@property
def native_value(self) -> int:
"""Return the current velocity value."""
return self._coord.velocity
async def async_added_to_hass(self) -> None:
"""Restore last known velocity on startup."""
await super().async_added_to_hass()
last_data = await self.async_get_last_number_data()
if last_data and last_data.native_value is not None:
self._coord.velocity = int(last_data.native_value)
LOGGER.debug(
"%s: restored velocity to %s", self._coord.name, self._coord.velocity
)
async def async_set_native_value(self, value: float) -> None:
"""Set the velocity value."""
self._coord.velocity = int(value)
self.async_write_ha_state()

View File

@@ -3,8 +3,14 @@
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
from homeassistant.components.sensor import SensorEntity, SensorEntityDescription
from homeassistant.components.sensor.const import SensorDeviceClass, SensorStateClass
from homeassistant.components.sensor import (
SensorEntity,
SensorEntityDescription,
)
from homeassistant.components.sensor.const import (
SensorDeviceClass,
SensorStateClass,
)
from homeassistant.const import (
ATTR_BATTERY_LEVEL,
PERCENTAGE,
@@ -15,7 +21,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from . import ConfigEntryType, async_setup_shade_platform
from . import ConfigEntryType
from .const import ATTR_RSSI, DOMAIN
from .coordinator import PVCoordinator
@@ -39,25 +45,18 @@ SENSOR_TYPES: list[SensorEntityDescription] = [
]
def _add_entities(
coordinator: PVCoordinator, async_add_entities: AddEntitiesCallback
) -> None:
"""Create sensor entities for a single shade coordinator."""
async_add_entities(
[
PVSensor(coordinator, descr, format_mac(coordinator.address))
for descr in SENSOR_TYPES
]
)
async def async_setup_entry(
hass: HomeAssistant,
_hass: HomeAssistant,
config_entry: ConfigEntryType,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Add sensors for passed config_entry in Home Assistant."""
async_setup_shade_platform(hass, config_entry, async_add_entities, _add_entities)
pv_dev: PVCoordinator = config_entry.runtime_data
for descr in SENSOR_TYPES:
async_add_entities(
[PVSensor(pv_dev, descr, format_mac(config_entry.unique_id))]
)
class PVSensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], SensorEntity): # type: ignore[reportIncompatibleMethodOverride]
@@ -68,7 +67,7 @@ class PVSensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], SensorEntity):
def __init__(
self, pv_dev: PVCoordinator, descr: SensorEntityDescription, unique_id: str
) -> None:
"""Initialize the BMS sensor."""
"""Intitialize the BMS sensor."""
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
self._attr_device_info = pv_dev.device_info
self.entity_description = descr

View File

@@ -1,32 +1,15 @@
{
"config": {
"flow_title": "PowerView Home Setup",
"flow_title": "Setup {name}",
"step": {
"user": {
"title": "Set up PowerView Home",
"description": "Configure your PowerView encryption key. All shades on your network will be discovered automatically via Bluetooth.",
"data": {
"key_method": "Key source",
"hub_url": "PowerView hub URL",
"home_key": "HomeKey (32 hex characters or \\xNN format)"
},
"data_description": {
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
}
"bluetooth_confirm": {
"description": "[%key:component::bluetooth::config::step::bluetooth_confirm::description%]"
}
},
"error": {
"invalid_key_format": "HomeKey must be a valid hexadecimal string",
"invalid_key_length": "HomeKey must be exactly 32 hex characters (16 bytes)",
"hub_connection_error": "Cannot connect to the PowerView hub",
"hub_http_error": "Hub returned an HTTP error",
"hub_timeout": "Connection to hub timed out",
"hub_protocol_error": "Hub returned an unexpected response"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]"
"no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]",
"not_supported": "Device not supported"
}
}
}

View File

@@ -1,32 +1,15 @@
{
"config": {
"flow_title": "PowerView Home Setup",
"step": {
"user": {
"title": "Set up PowerView Home",
"description": "Configure your PowerView encryption key. All shades on your network will be discovered automatically via Bluetooth.",
"data": {
"key_method": "Key source",
"hub_url": "PowerView hub URL",
"home_key": "HomeKey (32 hex characters or \\xNN format)"
},
"data_description": {
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
}
}
},
"error": {
"invalid_key_format": "HomeKey must be a valid hexadecimal string",
"invalid_key_length": "HomeKey must be exactly 32 hex characters (16 bytes)",
"hub_connection_error": "Cannot connect to the PowerView hub",
"hub_http_error": "Hub returned an HTTP error",
"hub_timeout": "Connection to hub timed out",
"hub_protocol_error": "Hub returned an unexpected response"
},
"abort": {
"already_configured": "Device is already configured",
"single_instance_allowed": "Already configured. Only a single configuration possible."
"no_devices_found": "No devices found on the network",
"not_supported": "Device not supported"
},
"flow_title": "Setup {name}",
"step": {
"bluetooth_confirm": {
"description": "Do you want to set up {name}?"
}
}
}
}

View File

@@ -2,12 +2,6 @@
* Emulate a Hunter Douglas PowerView cover device using ESP32
* used e.g. to gain the home_key from an existing installation via BLE
*
* REQUIRES:
* - ESP32 Board Definitions 3.0.x (tested on 3.0.7)
* - WolfSSL 5.7.x (tested on 5.7.6)
* - Phone Region: Potentially an alternative region depending on the app response
* - e.g. To add Parkland shades in the US, phone region set to the UK temporarily
*
* TODO:
* - cleanup code
* - think about emulating a remote
@@ -17,13 +11,9 @@
*/
#define NAME "myPVcover"
const uint16_t SW_VERSION = 391;
const char *SERIAL_NR = "01234567890ABCDEF";
const uint16_t TYP_ID = 42; // 62
const uint16_t MODEL_ID = 224;
const uint16_t FW_REVISION = 27;
const uint32_t HW_REVISION = 171103;
const uint8_t BATTERY_LEVEL = 42;
#define FW_VERSION "391"
#define SERIAL_NR "01234567890ABCDEF"
#include <BLEDevice.h>
#include <BLEServer.h>
@@ -50,11 +40,7 @@ int devId = INVALID_DEVID; //if not using async INVALID_DEVID is default
#define DEV_SERVICE_UUID BLEUUID("180A")
#define SER_CHAR_UUID BLEUUID("2A25")
#define MAN_CHAR_UUID BLEUUID("2A29")
#define MOD_CHAR_UUID BLEUUID("2A24")
#define FWR_CHAR_UUID BLEUUID("2A26")
#define HWR_CHAR_UUID BLEUUID("2A27")
#define SWR_CHAR_UUID BLEUUID("2A28")
#define SWC_CHAR_UUID BLEUUID("2A28")
#define BAT_SERVICE_UUID BLEUUID("180F")
#define BAT_CHAR_UUID BLEUUID("2A19")
@@ -83,7 +69,7 @@ struct notification {
};
BLECharacteristic *pCharacteristic_cover, *pCharacteristic_fw, *pCharacteristic_unknown, *pCharacteristic_bat;
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser, *pCharacteristic_man, *pCharacteristic_mod, *pCharacteristic_fwr, *pCharacteristic_hwr;
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser;
BLEServer *pServer = NULL;
bool deviceConnected = false;
bool oldDeviceConnected = false;
@@ -172,12 +158,12 @@ void decode(BLECharacteristic *pChar) {
memcpy((void *)&msg, data_dec, 4);
Serial.printf("\t message: SRV: %02x, CMD %02x, SEQ %i, LEN %i\n", msg.serviceID, msg.cmdID, msg.sequence, msg.data_len);
// special responses (static data!)
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, (uint8_t)(SW_VERSION & 0xFF), (uint8_t)(SW_VERSION >> 8), 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, TYP_ID, MODEL_ID, 0x08 }; // HW diagnostics
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
// sepecial responses (static data!)
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x2a, 0xe0, 0x08 }; // HW diagnostics
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
Serial.print("\t\t");
switch ((msg.serviceID << 8) | msg.cmdID) {
@@ -193,7 +179,7 @@ void decode(BLECharacteristic *pChar) {
break;
case 0xF711:
// identify
Serial.printf("identify: %i times\n", data_dec[4]);
Serial.printf("identify: %i\n", data_dec[4]);
resp_size = set_response(&response, (const message *)data_dec);
break;
case 0xF7B8:
@@ -222,8 +208,8 @@ void decode(BLECharacteristic *pChar) {
case 0xFB02:
// set shade key
Serial.print("set shade key: ");
print_hex(&data_raw[4], data_len - 4, "\\x", "");
// set response before key, to acknowledge unencrypted
print_hex(&data_raw[4], data_len - 4, "\\x");
// set resonse before key, to acknowledge unencrypted
resp_size = set_response(&response, (const message *)data_dec);
if (msg.data_len == 16) {
memcpy(home_key, &data_raw[4], 16);
@@ -265,9 +251,7 @@ void decode(BLECharacteristic *pChar) {
resp_size = set_response(&response, (const message *)data_dec);
break;
default:
Serial.println(F("*********************************** unknown message (try ACK)"));
resp_size = set_response(&response, (const message *)data_dec);
break;
Serial.println(F("*********************************** unknown message"));
}
if (resp_size) {
pChar->setValue((uint8_t *)&response, resp_size);
@@ -307,7 +291,7 @@ class coverCallbacks : public BLECharacteristicCallbacks {
void onNotify(BLECharacteristic *pCharacteristic) {
Serial.printf("Cover onNotify() %s\n", pCharacteristic->toString().c_str());
}
void onStatus(BLECharacteristic *pCharacteristic, Status s, uint32_t code) {
Serial.printf("Cover onStatus() %s: %s\n", BLEstate[s], pCharacteristic->toString().c_str());
}
@@ -357,7 +341,6 @@ class genericCallbacks : public BLECharacteristicCallbacks {
void setup() {
Serial.begin(115200);
delay(1000); // wait for terminal to be ready
Serial.println(NAME " initializing ...");
BLEDevice::init(NAME);
@@ -390,7 +373,8 @@ pDesc1->setValue("cover");*/
BAT_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_bat->setCallbacks(new batteryCallbacks());
pCharacteristic_bat->setValue((uint8_t *)&BATTERY_LEVEL, 1);
uint8_t battery_level = 42;
pCharacteristic_bat->setValue(&battery_level, 1);
pCharacteristic_bat->addDescriptor(new BLE2902());
BLEService *pFWService = pServer->createService(FW_SERVICE_UUID);
@@ -406,9 +390,9 @@ pDesc1->setValue("cover");*/
BLEService *pDEVService = pServer->createService(DEV_SERVICE_UUID);
pCharacteristic_dev = pDEVService->createCharacteristic(
SWR_CHAR_UUID,
SWC_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_dev->setValue(String(SW_VERSION));
pCharacteristic_dev->setValue(FW_VERSION);
pCharacteristic_dev->setCallbacks(new genericCallbacks());
pCharacteristic_ser = pDEVService->createCharacteristic(
SER_CHAR_UUID,
@@ -416,28 +400,6 @@ pDesc1->setValue("cover");*/
pCharacteristic_ser->setValue(SERIAL_NR);
pCharacteristic_ser->setCallbacks(new genericCallbacks());
pCharacteristic_man = pDEVService->createCharacteristic(
MAN_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_man->setValue("Hunter Douglas");
pCharacteristic_man->setCallbacks(new genericCallbacks());
pCharacteristic_mod = pDEVService->createCharacteristic(
MOD_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_mod->setValue(String(TYP_ID));
pCharacteristic_mod->setCallbacks(new genericCallbacks());
pCharacteristic_fwr = pDEVService->createCharacteristic(
FWR_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_fwr->setValue(String(FW_REVISION));
pCharacteristic_fwr->setCallbacks(new genericCallbacks());
pCharacteristic_hwr = pDEVService->createCharacteristic(
HWR_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_hwr->setValue(String(HW_REVISION));
pCharacteristic_hwr->setCallbacks(new genericCallbacks());
// Start the services
pCovService->start();
pFWService->start();
@@ -446,10 +408,9 @@ pDesc1->setValue("cover");*/
// Start advertising
BLEAdvertisementData AdvertisementData;
const char adv[] = { 0x19, 0x08, 0x00, 0x00, TYP_ID, 0x00, 0x00, 0x00, 0x00, 0x00, 0xA2 };
// Hunter Douglas ^^ -- ^^ ^^key ^^ ^--pos1--^
AdvertisementData.setManufacturerData(String(adv, 11));
const String manufacturerData = String("\x19\x08\x00\x00\x2A\x00\x00\x00\x00\x00\xA2", 11);
// Hunter Douglas ^^--^^ ^^key^^ ^^ ID-Type
AdvertisementData.setManufacturerData(manufacturerData);
AdvertisementData.setPartialServices(BLEUUID(COVER_SERVICE_UUID));
AdvertisementData.setFlags((1 << 2) | (1 << 1)); // [BR/EDR Not Supported] | [LE General Discoverable Mode]

Binary file not shown.

Before

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 53 KiB

View File

@@ -1,17 +1,7 @@
# pyproject.toml
[project]
name = "hunterdouglas_powerview_ble"
classifiers = [
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.13"
]
readme = "README.md"
requires-python = ">=3.13.2"
[project.urls]
"Source Code" = "https://github.com/patman15/hdpv_ble/"
"Bug Reports" = "https://github.com/patman15/hdpv_ble/issues"
requires-python = ">=3.12.0"
[tool.pytest.ini_options]
minversion = "8.0"
@@ -24,9 +14,9 @@ testpaths = [
]
asyncio_mode = "auto"
# ruff settings from HA 2025.2.2
# ruff configuration taken from HA 2024.11.2 (less ignores)
[tool.ruff]
required-version = ">=0.9.1"
required-version = ">=0.6.8"
[tool.ruff.lint]
select = [
@@ -45,10 +35,8 @@ select = [
"B017", # pytest.raises(BaseException) should be considered evil
"B018", # Found useless attribute access. Either assign it to a variable or remove it.
"B023", # Function definition does not bind loop variable {name}
"B024", # `{name}` is an abstract base class, but it has no abstract methods or properties
"B026", # Star-arg unpacking after a keyword argument is strongly discouraged
"B032", # Possible unintentional type annotation (using :). Did you mean to assign (using =)?
"B035", # Dictionary comprehension uses static key
"B904", # Use raise from to specify exception cause
"B905", # zip() without an explicit strict= parameter
"BLE",
@@ -82,27 +70,12 @@ select = [
"RSE", # flake8-raise
"RUF005", # Consider iterable unpacking instead of concatenation
"RUF006", # Store a reference to the return value of asyncio.create_task
"RUF007", # Prefer itertools.pairwise() over zip() when iterating over successive pairs
"RUF008", # Do not use mutable default values for dataclass attributes
"RUF010", # Use explicit conversion flag
"RUF013", # PEP 484 prohibits implicit Optional
"RUF016", # Slice in indexed access to type {value_type} uses type {index_type} instead of an integer
"RUF017", # Avoid quadratic list summation
"RUF018", # Avoid assignment expressions in assert statements
"RUF019", # Unnecessary key check before dictionary access
"RUF020", # {never_like} | T is equivalent to T
"RUF021", # Parenthesize a and b expressions when chaining and and or together, to make the precedence clear
"RUF022", # Sort __all__
"RUF023", # Sort __slots__
"RUF024", # Do not pass mutable objects as values to dict.fromkeys
"RUF026", # default_factory is a positional-only argument to defaultdict
"RUF030", # print() call in assert statement is likely unintentional
"RUF032", # Decimal() called with float literal argument
"RUF033", # __post_init__ method with argument defaults
"RUF034", # Useless if-else condition
"RUF100", # Unused `noqa` directive
"RUF101", # noqa directives that use redirected rule codes
"RUF200", # Failed to parse pyproject.toml: {message}
# "RUF100", # Unused `noqa` directive; temporarily every now and then to clean them up
"S102", # Use of exec detected
"S103", # bad-file-permissions
"S108", # hardcoded-temp-file
@@ -115,7 +88,7 @@ select = [
"S317", # suspicious-xml-sax-usage
"S318", # suspicious-xml-mini-dom-usage
"S319", # suspicious-xml-pull-dom-usage
# "S320", # suspicious-xmle-tree-usage
"S320", # suspicious-xmle-tree-usage
"S601", # paramiko-call
"S602", # subprocess-popen-with-shell-equals-true
"S604", # call-with-shell-equals-true
@@ -126,7 +99,7 @@ select = [
"SLOT", # flake8-slots
"T100", # Trace found: {name} used
"T20", # flake8-print
"TC", # flake8-type-checking
"TCH", # flake8-type-checking
"TID", # Tidy imports
"TRY", # tryceratops
"UP", # pyupgrade
@@ -146,12 +119,13 @@ ignore = [
# "PLC1901", # {existing} can be simplified to {replacement} as an empty string is falsey; too many false positives
# "PLR0911", # Too many return statements ({returns} > {max_returns})
# "PLR0912", # Too many branches ({branches} > {max_branches})
"PLR0913", # Too many arguments to function call ({c_args} > {max_args})
# "PLR0913", # Too many arguments to function call ({c_args} > {max_args})
# "PLR0915", # Too many statements ({statements} > {max_statements})
"PLR2004", # Magic value used in comparison, consider replacing {value} with a constant variable
# "PLW2901", # Outer {outer_kind} variable {name} overwritten by inner {inner_kind} target
# "PT004", # Fixture {fixture} does not return anything, add leading underscore
# "PT011", # pytest.raises({exception}) is too broad, set the `match` parameter or use a more specific exception
"PT018", # Assertion should be broken down into multiple parts
# "PT018", # Assertion should be broken down into multiple parts
# "RUF001", # String contains ambiguous unicode character.
# "RUF002", # Docstring contains ambiguous unicode character.
# "RUF003", # Comment contains ambiguous unicode character.
@@ -162,12 +136,14 @@ ignore = [
# "SIM115", # Use context handler for opening files
# Moving imports into type-checking blocks can mess with pytest.patch()
"TC001", # Move application import {} into a type-checking block
"TC002", # Move third-party import {} into a type-checking block
"TC003", # Move standard library import {} into a type-checking block
"TCH001", # Move application import {} into a type-checking block
"TCH002", # Move third-party import {} into a type-checking block
"TCH003", # Move standard library import {} into a type-checking block
"TRY003", # Avoid specifying long messages outside the exception class
"TRY400", # Use `logging.exception` instead of `logging.error`
# Ignored due to performance: https://github.com/charliermarsh/ruff/issues/2923
"UP038", # Use `X | Y` in `isinstance` call instead of `(X, Y)`
# May conflict with the formatter, https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
"W191",
@@ -185,14 +161,3 @@ ignore = [
"PLE0605"
]
[tool.ruff.lint.isort]
force-sort-within-sections = true
known-first-party = [
"homeassistant",
]
combine-as-imports = true
split-on-trailing-comma = false
[tool.ruff.lint.per-file-ignores]
"scripts/*" = ["T201"]

View File

@@ -1,6 +1,4 @@
homeassistant==2025.11.0
homeassistant==2024.11.0
pip>=21.3.1
ruff>=0.9.1,<=0.15.0
types-requests
mypy~=1.19.1
codespell
ruff==0.6.8

View File

@@ -2,9 +2,9 @@
wheel
home-assistant-bluetooth
habluetooth>=5.3.0
habluetooth>=3.6.0
bluetooth-adapters
pytest>=8.4.1
pytest>=8.3.3
pytest-cov>=5.0.0
pytest-socket>=0.7.0
pytest-asyncio>=0.24.0
@@ -16,10 +16,10 @@ aiohttp
aiohttp_cors
aiohttp-fast-url-dispatcher
aiohttp-zlib-ng
bleak>=1.0.1
bleak-retry-connector>=4.4.3
bleak>=0.22.3
bleak-retry-connector>=3.6.0
bluetooth-data-tools
pyserial-asyncio
pyudev
pytest-homeassistant-custom-component==0.13.294
pytest-homeassistant-custom-component==0.13.181

View File

@@ -1 +0,0 @@
"""Script to extract PowerView homekey from a G3 PowerView Gateway."""

View File

@@ -1,119 +0,0 @@
"""Extract PowerView homekey from a G3 PowerView Gateway."""
import base64
import json
import struct
from typing import Any, Final
import requests
HUB: Final[str] = "http://powerview-g3.local"
TIMEOUT: Final[int] = 10
def create_request(sid: int, cid: int, sequence_id: int, data: bytes) -> bytes:
"""Assemble a request frame for the PowerView protocol."""
return struct.pack("<BBBB", sid, cid, sequence_id, len(data)) + data
def decode_response(packet: bytes) -> dict[str, Any]:
"""Decode a response frame from the PowerView protocol."""
if len(packet) < 4:
raise ValueError("Packet size too small")
sid, cid, sequence_id, length = struct.unpack("<BBBB", packet[0:4])
if len(packet) != 4 + length:
raise ValueError("Not all data present")
if length < 1:
raise ValueError("No errorCode present")
(error_code,) = struct.unpack("<B", packet[4:5])
data: Final[bytes] = packet[5:]
return {
"cid": cid,
"sid": sid,
"sequenceId": sequence_id,
"errorCode": error_code,
"data": data,
}
def create_get_shade_key_request(sequence_id) -> bytes:
"""Create a GetShadeKey request frame."""
return create_request(251, 18, sequence_id, b"")
def get_shade_key(hub: str, ble_name) -> bytes:
"""Get the homekey for a shade."""
try:
shades_exec_resp: requests.Response = requests.post(
hub + "/home/shades/exec?shades=" + ble_name,
json={"hex": create_get_shade_key_request(1).hex()},
timeout=TIMEOUT,
)
shades_exec_resp.raise_for_status()
except requests.exceptions.RequestException as ex:
print(f"Unable to send GetShadeKey {ex!s}")
raise
result: dict = json.loads(shades_exec_resp.content)
responses = result.get("responses", [])
if len(responses) != 1 or "hex" not in responses[0]:
raise OSError(f"Error when attempting GetShadeKey: {result}")
response: Final[bytes] = bytes.fromhex(responses[0]["hex"])
dec_resp: Final[dict[str, Any]] = decode_response(response)
if dec_resp["errorCode"] != 0:
raise ValueError(
f"BLE errorCode={dec_resp['errorCode']} data={dec_resp['data'].hex()}"
)
if len(dec_resp["data"]) != 16:
raise ValueError("Expected 16 byte homekey")
return dec_resp["data"]
def main(hub: str) -> int:
"""Extract the homekeys from all shades."""
try:
shades_resp: requests.Response = requests.get(
hub + "/home/shades", timeout=TIMEOUT
)
shades_resp.raise_for_status()
except requests.exceptions.RequestException as ex:
print(f"Unable to get list of shades:\n\t{ex!s}")
return -1
shades = json.loads(shades_resp.content)
print(f"Found {len(shades)} shades, interrogating")
network_key: bytes | None = None
for shade in shades:
name: str = base64.b64decode(shade["name"]).decode("utf-8")
try:
key: bytes = get_shade_key(hub, shade["bleName"])
network_key = key
except (OSError, ValueError) as ex:
if network_key is not None:
key = network_key
print(f"Shade '{name}':")
print(f"\tBLE name: '{shade['bleName']}'")
print(f"\tHomeKey: {key.hex()} (shade unreachable, using network key)")
else:
print(f"Shade '{name}':")
print(f"\tBLE name: '{shade['bleName']}'")
print(f"\tHomeKey: ERROR - {ex}")
continue
print(f"Shade '{name}':")
print(f"\tBLE name: '{shade['bleName']}'")
print(f"\tHomeKey: {key.hex()}")
return 0
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Extract PowerView homekey from a G3 PowerView Gateway"
)
parser.add_argument("hub", nargs="?", help="URL to HUB", default=HUB)
args = parser.parse_args()
sys.exit(main(**vars(args)))