Compare commits
49 Commits
feature/ty
...
hub-archit
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
deee1baad5 | ||
|
|
354d06b468 | ||
|
|
7936a4fb24 | ||
|
|
1b9aed4f89 | ||
|
|
8a6a17b767 | ||
|
|
0efca4ff52 | ||
|
|
789f716707 | ||
|
|
ce04907d58 | ||
|
|
87bac49529 | ||
|
|
e2bb3b5592 | ||
|
|
acb2c5ff52 | ||
|
|
7a2bf2193a | ||
|
|
f260416676 | ||
|
|
abb0a3e8a3 | ||
|
|
652337e32c | ||
|
|
af08d18d62 | ||
|
|
894580c20b | ||
|
|
317b450702 | ||
|
|
31185a4446 | ||
|
|
04c7036351 | ||
|
|
d66cf61887 | ||
|
|
5d498b8753 | ||
|
|
b558083b50 | ||
|
|
3775496936 | ||
|
|
883aca753e | ||
|
|
fe3646df27 | ||
|
|
bae4158a3c | ||
|
|
d9ebc54026 | ||
|
|
62bbfd7361 | ||
|
|
30dff09bb1 | ||
|
|
74d906151e | ||
|
|
9773e5df65 | ||
|
|
b2d5335e1d | ||
|
|
a6aaf4d727 | ||
|
|
f2ad61a016 | ||
|
|
f6ec17c9b2 | ||
|
|
d79096357d | ||
|
|
52f7390fc0 | ||
|
|
c9a27388af | ||
|
|
1590647c9e | ||
|
|
1559776aa8 | ||
|
|
2c03881b60 | ||
|
|
0bc5644883 | ||
|
|
e97eef94f8 | ||
|
|
4b51ea514f | ||
|
|
591815652d | ||
|
|
ba487d907d | ||
|
|
8b0bccee6b | ||
|
|
054f35f838 |
16
.github/workflows/lint.yml
vendored
16
.github/workflows/lint.yml
vendored
@@ -8,8 +8,8 @@ on:
|
||||
- cron: '0 5 * * 6'
|
||||
|
||||
jobs:
|
||||
ruff:
|
||||
name: "Ruff"
|
||||
lint:
|
||||
name: "Lint the code"
|
||||
runs-on: "ubuntu-latest"
|
||||
steps:
|
||||
- name: "Checkout the repository"
|
||||
@@ -18,11 +18,17 @@ jobs:
|
||||
- name: "Set up Python"
|
||||
uses: actions/setup-python@main
|
||||
with:
|
||||
python-version: "3.12"
|
||||
python-version: "3.13.2"
|
||||
cache: "pip"
|
||||
|
||||
- name: "Install requirements"
|
||||
run: python3 -m pip install -r requirements.txt
|
||||
|
||||
- name: "Run Ruff"
|
||||
run: python3 -m ruff check .
|
||||
- name: "Run ruff"
|
||||
run: ruff check .
|
||||
|
||||
- name: "Run mypy"
|
||||
run: mypy .
|
||||
|
||||
- name: "Run codespell"
|
||||
run: codespell -L hass
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -152,6 +152,3 @@ cython_debug/
|
||||
#.idea/
|
||||
aes.py
|
||||
*.bak
|
||||
emu/PV_BLE_cover/PV_BLE_cover.ino
|
||||
img/
|
||||
*.zip
|
||||
|
||||
39
README.md
39
README.md
@@ -8,8 +8,7 @@
|
||||
> [!WARNING]
|
||||
> - This integration is under development!
|
||||
> - Test coverage is low, malfunction might occur.
|
||||
> - Only devices that are **not** added to the app are controlable. It is possible to add them to the app if you just want to monitor the status (position, battery) in Home Assistant.
|
||||
> - Currently only position change is supported (e.g., no tilt)
|
||||
> - The HOME_KEY is lost over updates!
|
||||
|
||||
## Features
|
||||
- Zero configuration
|
||||
@@ -18,7 +17,7 @@
|
||||
### Supported Devices
|
||||
|
||||
Type* | Description
|
||||
-- | --
|
||||
-- | --
|
||||
1 | Designer Roller
|
||||
4 | Roman
|
||||
5 | Bottom Up
|
||||
@@ -26,6 +25,7 @@ Type* | Description
|
||||
10 | Duette and Applause SkyLift
|
||||
19 | Provenance Woven Wood
|
||||
31, 32, 84 | Vignette
|
||||
39 | Parkland
|
||||
42 | M25T Roller Blind
|
||||
49 | AC Roller
|
||||
52 | Banded Shades
|
||||
@@ -39,10 +39,14 @@ The integration provides the following information about the battery
|
||||
Platform | Description | Unit | Details
|
||||
-- | -- | -- | --
|
||||
`binary_sensor` | battery charging indicator | `bool` | true if battery is charging
|
||||
`button` | identify shade | - | identify shade by LED and 3 beeps
|
||||
`cover` | view/control position | `%` | percentage cover is open (100% is open)
|
||||
`sensor` | SoC (state of charge) | `%` | range 100% (full), 50%, 20%, 0% (battery empty)
|
||||
|
||||
## Installation
|
||||
> [!IMPORTANT]
|
||||
> In case you added your shades to the app or a gateway, you need to [set the encryption key](#set-the-encryption-key) manually in the [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py) file after **each** update!
|
||||
|
||||
### Automatic
|
||||
Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom repository](https://hacs.xyz/docs/faq/custom_repositories/).
|
||||
|
||||
@@ -57,12 +61,22 @@ Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom rep
|
||||
1. Restart Home Assistant
|
||||
1. In the HA UI go to "Configuration" -> "Integrations" click "+" and search for "Hunter Douglas PowerView (BLE)"
|
||||
|
||||
## Set the Encryption Key
|
||||
Currently, there are three methods to obtain the key:
|
||||
|
||||
## Outlook
|
||||
- Add support for encryption
|
||||
- Allow parallel usage to PowerView app as "remote"
|
||||
- Add support for tilt function
|
||||
- Add support for further device types
|
||||
1. Via adopting a BLE shade: There is a [shade emulator](/emu/PV_BLE_cover) that works with Arduino IDE and an ESP32 device (≥ 2MiB flash, ≥ 128KiB required), e.g. [Adafruit QT Py ESP32-S3](https://www.adafruit.com/product/5426). Install and connect via serial port, then go to the PowerView app and add the shade `myPVcover` to your home. You will see a log message `set shade key: \xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx` . Copy this key. You can delete the shade from the app when done.
|
||||
2. Extracting from gateway: This [script](scripts/extract_gateway3_homekey.py) is able to extract the key from a working PowerView gateway.
|
||||
3. Grabbing from the app: Checkout this [post in the Home Assistant community forum](https://community.home-assistant.io/t/hunter-douglas-powerview-gen-3-integration/424836/228).
|
||||
|
||||
Finally, you need to manually copy the key to [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py).
|
||||
|
||||
> [!IMPORTANT]
|
||||
> You need to update the file after **each** update!
|
||||
|
||||
## Known Issues
|
||||
<details><summary>Shade inoperable after charging</summary>
|
||||
It seems that the shades require some re-initialization after charging. The solution is currently unknown, but as a workaround you can operate the shade ones using the vendor app.
|
||||
</details>
|
||||
|
||||
## Troubleshooting
|
||||
In case you have severe troubles,
|
||||
@@ -72,6 +86,15 @@ In case you have severe troubles,
|
||||
- disable the log (Home Assistant will prompt you to download the log), and finally
|
||||
- [open an issue](https://github.com/patman15/hdpv_ble/issues/new?assignees=&labels=Bug&projects=&template=bug.yml) with a good description of what happened and attach the log.
|
||||
|
||||
# Thanks To
|
||||
[@mannkind](https://github.com/mannkind)
|
||||
|
||||
[license-shield]: https://img.shields.io/github/license/patman15/hdpv_ble.svg?style=for-the-badge
|
||||
[releases-shield]: https://img.shields.io/github/release/patman15/hdpv_ble.svg?style=for-the-badge
|
||||
[releases]: https://github.com//patman15/hdpv_ble/releases
|
||||
|
||||
## Outlook
|
||||
- Add tests!
|
||||
- Allow parallel usage to PowerView app as "remote"
|
||||
- Add support for tilt function
|
||||
- Add support for further device types
|
||||
|
||||
@@ -4,48 +4,194 @@
|
||||
@license: Apache-2.0 license
|
||||
"""
|
||||
|
||||
import base64
|
||||
from collections.abc import Callable
|
||||
|
||||
import aiohttp
|
||||
from bleak.backends.device import BLEDevice
|
||||
from bleak.exc import BleakError
|
||||
from homeassistant.components.bluetooth import async_ble_device_from_address
|
||||
|
||||
from homeassistant.components import bluetooth
|
||||
from homeassistant.components.bluetooth import (
|
||||
BluetoothCallbackMatcher,
|
||||
BluetoothScanningMode,
|
||||
BluetoothServiceInfoBleak,
|
||||
async_ble_device_from_address,
|
||||
async_discovered_service_info,
|
||||
)
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import Platform
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.helpers.dispatcher import (
|
||||
async_dispatcher_connect,
|
||||
async_dispatcher_send,
|
||||
)
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .api import UUID_COV_SERVICE as UUID
|
||||
from .const import CONF_HUB_URL, LOGGER, MFCT_ID, SIGNAL_NEW_SHADE
|
||||
from .coordinator import PVCoordinator
|
||||
|
||||
PLATFORMS: list[Platform] = [Platform.BINARY_SENSOR, Platform.COVER, Platform.SENSOR]
|
||||
PLATFORMS: list[Platform] = [
|
||||
Platform.BINARY_SENSOR,
|
||||
Platform.BUTTON,
|
||||
Platform.COVER,
|
||||
Platform.NUMBER,
|
||||
Platform.SENSOR,
|
||||
]
|
||||
|
||||
type ConfigEntryType = ConfigEntry[PVCoordinator]
|
||||
type HubRuntimeData = dict[str, PVCoordinator]
|
||||
type ConfigEntryType = ConfigEntry[HubRuntimeData]
|
||||
|
||||
type AddEntitiesFn = Callable[[PVCoordinator, AddEntitiesCallback], None]
|
||||
|
||||
|
||||
def async_setup_shade_platform(
|
||||
hass: HomeAssistant,
|
||||
config_entry: ConfigEntryType,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
add_fn: AddEntitiesFn,
|
||||
) -> None:
|
||||
"""Set up a platform for all current and future shades."""
|
||||
for coordinator in config_entry.runtime_data.values():
|
||||
add_fn(coordinator, async_add_entities)
|
||||
|
||||
@callback
|
||||
def _async_new_shade(coordinator: PVCoordinator) -> None:
|
||||
add_fn(coordinator, async_add_entities)
|
||||
|
||||
config_entry.async_on_unload(
|
||||
async_dispatcher_connect(
|
||||
hass,
|
||||
SIGNAL_NEW_SHADE.format(entry_id=config_entry.entry_id),
|
||||
_async_new_shade,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
async def _fetch_shade_names(
|
||||
hass: HomeAssistant, hub_url: str
|
||||
) -> dict[str, str]:
|
||||
"""Fetch BLE name -> friendly name mapping from the hub.
|
||||
|
||||
Returns empty dict on failure.
|
||||
"""
|
||||
session = async_get_clientsession(hass)
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
try:
|
||||
async with session.get(f"{hub_url}/home/shades", timeout=timeout) as resp:
|
||||
resp.raise_for_status()
|
||||
shades = await resp.json(content_type=None)
|
||||
except (TimeoutError, aiohttp.ClientError, ValueError):
|
||||
return {}
|
||||
|
||||
names: dict[str, str] = {}
|
||||
for shade in shades or []:
|
||||
ble_name = shade.get("bleName", "")
|
||||
if not ble_name:
|
||||
continue
|
||||
name_b64 = shade.get("name", "")
|
||||
try:
|
||||
name = base64.b64decode(name_b64).decode("utf-8") if name_b64 else ble_name
|
||||
except Exception: # noqa: BLE001
|
||||
name = ble_name
|
||||
names[ble_name] = name
|
||||
return names
|
||||
|
||||
|
||||
async def _async_setup_shade(
|
||||
hass: HomeAssistant,
|
||||
entry: ConfigEntryType,
|
||||
service_info: BluetoothServiceInfoBleak,
|
||||
shade_names: dict[str, str],
|
||||
) -> None:
|
||||
"""Create a coordinator for a newly discovered shade."""
|
||||
address = service_info.address
|
||||
|
||||
if address in entry.runtime_data:
|
||||
return
|
||||
|
||||
ble_device: BLEDevice | None = async_ble_device_from_address(
|
||||
hass=hass, address=address, connectable=True
|
||||
)
|
||||
if not ble_device:
|
||||
LOGGER.debug("BLE device %s not connectable, skipping", address)
|
||||
return
|
||||
|
||||
friendly_name = shade_names.get(service_info.name, service_info.name)
|
||||
|
||||
coordinator = PVCoordinator(
|
||||
hass, ble_device, entry.data.copy(), friendly_name
|
||||
)
|
||||
|
||||
entry.runtime_data[address] = coordinator
|
||||
entry.async_on_unload(coordinator.async_start())
|
||||
|
||||
async_dispatcher_send(
|
||||
hass,
|
||||
SIGNAL_NEW_SHADE.format(entry_id=entry.entry_id),
|
||||
coordinator,
|
||||
)
|
||||
|
||||
# Query device info in background — don't block entry setup
|
||||
try:
|
||||
await coordinator.query_dev_info()
|
||||
except BleakError:
|
||||
LOGGER.warning(
|
||||
"Could not query device info for %s (%s)",
|
||||
friendly_name,
|
||||
address,
|
||||
)
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntryType) -> bool:
|
||||
"""Set up BT Battery Management System from a config entry."""
|
||||
"""Set up PowerView Home from a config entry."""
|
||||
LOGGER.debug("Setup of %s", repr(entry))
|
||||
|
||||
if entry.unique_id is None:
|
||||
raise ConfigEntryError("Missing unique ID for device.")
|
||||
entry.runtime_data = {}
|
||||
|
||||
ble_device = async_ble_device_from_address(
|
||||
hass=hass, address=entry.unique_id, connectable=True
|
||||
# Resolve shade friendly names from hub if available
|
||||
hub_url = entry.data.get(CONF_HUB_URL, "")
|
||||
shade_names: dict[str, str] = {}
|
||||
if hub_url:
|
||||
shade_names = await _fetch_shade_names(hass, hub_url)
|
||||
|
||||
# Forward platforms first so dispatched entities have their setup ready
|
||||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||
|
||||
# Kick off shade setup for already-discovered BLE devices (non-blocking)
|
||||
for service_info in async_discovered_service_info(hass, connectable=True):
|
||||
if (
|
||||
MFCT_ID in service_info.manufacturer_data
|
||||
and UUID in service_info.service_uuids
|
||||
):
|
||||
hass.async_create_task(
|
||||
_async_setup_shade(hass, entry, service_info, shade_names)
|
||||
)
|
||||
|
||||
# Register for future BLE discoveries
|
||||
def _async_discovered_device(
|
||||
service_info: BluetoothServiceInfoBleak,
|
||||
change: bluetooth.BluetoothChange,
|
||||
) -> None:
|
||||
if service_info.address not in entry.runtime_data:
|
||||
hass.async_create_task(
|
||||
_async_setup_shade(hass, entry, service_info, shade_names)
|
||||
)
|
||||
|
||||
entry.async_on_unload(
|
||||
bluetooth.async_register_callback(
|
||||
hass,
|
||||
_async_discovered_device,
|
||||
BluetoothCallbackMatcher(
|
||||
service_uuid=UUID,
|
||||
manufacturer_id=MFCT_ID,
|
||||
),
|
||||
BluetoothScanningMode.ACTIVE,
|
||||
)
|
||||
)
|
||||
|
||||
if not ble_device:
|
||||
raise ConfigEntryNotReady(
|
||||
f"Could not find PowerView device ({entry.unique_id}) via Bluetooth"
|
||||
)
|
||||
|
||||
coordinator = PVCoordinator(hass, ble_device, entry.data.copy())
|
||||
try:
|
||||
await coordinator.query_dev_info()
|
||||
except BleakError as err:
|
||||
raise ConfigEntryNotReady("Unable to query device info.") from err
|
||||
|
||||
# Insert the coordinator in the global registry
|
||||
hass.data.setdefault(DOMAIN, {})
|
||||
entry.runtime_data = coordinator
|
||||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||
entry.async_on_unload(coordinator.async_start())
|
||||
return True
|
||||
|
||||
|
||||
@@ -53,20 +199,8 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntryType) -> boo
|
||||
"""Unload a config entry."""
|
||||
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
|
||||
|
||||
if unload_ok:
|
||||
entry.runtime_data.clear()
|
||||
|
||||
LOGGER.debug("Unloaded config entry: %s, ok? %s!", entry.unique_id, str(unload_ok))
|
||||
return unload_ok
|
||||
|
||||
|
||||
async def async_migrate_entry(
|
||||
_hass: HomeAssistant, config_entry: ConfigEntryType
|
||||
) -> bool:
|
||||
"""Migrate old entry."""
|
||||
|
||||
if config_entry.version > 1:
|
||||
# This means the user has downgraded from a future version
|
||||
LOGGER.debug("Cannot downgrade from version %s", config_entry.version)
|
||||
return False
|
||||
|
||||
LOGGER.debug("Migrating from version %s", config_entry.version)
|
||||
|
||||
return False
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
"""Hunter Douglas PowerView BLE API."""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Final
|
||||
import time
|
||||
from typing import Final, NamedTuple
|
||||
|
||||
from bleak import BleakClient
|
||||
from bleak.backends.device import BLEDevice
|
||||
@@ -16,6 +16,7 @@ from cryptography.hazmat.primitives.ciphers.base import (
|
||||
AEADDecryptionContext,
|
||||
AEADEncryptionContext,
|
||||
)
|
||||
|
||||
from homeassistant.components.cover import (
|
||||
ATTR_CURRENT_POSITION,
|
||||
ATTR_CURRENT_TILT_POSITION,
|
||||
@@ -39,23 +40,88 @@ SHADE_TYPE: Final[dict[int, str]] = {
|
||||
6: "Duette",
|
||||
10: "Duette and Applause SkyLift",
|
||||
19: "Provenance Woven Wood",
|
||||
26: "Vertical",
|
||||
31: "Vignette",
|
||||
32: "Vignette",
|
||||
42: "M25T Roller Blind",
|
||||
49: "AC Roller",
|
||||
52: "Banded Shades",
|
||||
53: "Sonnette",
|
||||
57: "Carole Roman Shades",
|
||||
84: "Vignette",
|
||||
# top down bottom up
|
||||
# top down (single rail, inverted position)
|
||||
7: "Top Down",
|
||||
# top down bottom up (dual rail)
|
||||
8: "Duette, Top Down Bottom Up",
|
||||
9: "Duette DuoLite, Top Down Bottom Up",
|
||||
33: "Duette Architella, Top Down Bottom Up",
|
||||
47: "Pleated, Top Down Bottom Up",
|
||||
# top down, tilt anywhere
|
||||
# tilt only (no position movement)
|
||||
39: "Parkland",
|
||||
40: "Everwood Alternative Wood Blinds",
|
||||
# tilt on closed
|
||||
18: "Bottom Up, Tilt on Closed 90°",
|
||||
44: "Twist",
|
||||
# tilt anywhere (position + tilt)
|
||||
51: "Venetian, Tilt Anywhere",
|
||||
54: "Vertical Slats, Left Stack",
|
||||
55: "Vertical Slats, Right Stack",
|
||||
56: "Vertical Slats, Split Stack",
|
||||
62: "Venetian, Tilt Anywhere",
|
||||
# duolite (dual overlapping fabrics)
|
||||
38: "Dual Overlapped, Tilt 90°",
|
||||
65: "Dual Overlapped",
|
||||
95: "Dual Overlapped Illuminated",
|
||||
}
|
||||
|
||||
class ShadeCapability(NamedTuple):
|
||||
"""Capability flags for a shade type."""
|
||||
|
||||
has_tilt: bool = False
|
||||
tilt_only: bool = False
|
||||
is_tilt_on_closed: bool = False # tilt only available when fully closed
|
||||
is_top_down: bool = False # position logic is inverted (SkyLift style)
|
||||
is_tdbu: bool = False # dual-rail Top Down Bottom Up (needs two entities)
|
||||
is_duolite: bool = False # dual-fabric sheer+opaque (needs three entities)
|
||||
|
||||
|
||||
SHADE_CAPABILITIES: Final[dict[int, ShadeCapability]] = {
|
||||
# tilt anywhere (position + tilt)
|
||||
51: ShadeCapability(has_tilt=True),
|
||||
54: ShadeCapability(has_tilt=True),
|
||||
55: ShadeCapability(has_tilt=True),
|
||||
56: ShadeCapability(has_tilt=True),
|
||||
62: ShadeCapability(has_tilt=True),
|
||||
# tilt only (no position movement)
|
||||
39: ShadeCapability(has_tilt=True, tilt_only=True),
|
||||
40: ShadeCapability(has_tilt=True, tilt_only=True),
|
||||
# tilt on closed (tilt only available at fully closed position)
|
||||
18: ShadeCapability(has_tilt=True, is_tilt_on_closed=True),
|
||||
44: ShadeCapability(has_tilt=True, is_tilt_on_closed=True),
|
||||
# top-down only (single rail, inverted position)
|
||||
7: ShadeCapability(is_top_down=True),
|
||||
10: ShadeCapability(is_top_down=True),
|
||||
# dual-rail top-down/bottom-up (two independent rails → two entities)
|
||||
8: ShadeCapability(is_tdbu=True),
|
||||
33: ShadeCapability(is_tdbu=True),
|
||||
47: ShadeCapability(is_tdbu=True),
|
||||
# duolite (dual overlapping fabrics → three entities)
|
||||
9: ShadeCapability(is_tdbu=True, is_duolite=True),
|
||||
38: ShadeCapability(is_duolite=True),
|
||||
65: ShadeCapability(is_duolite=True),
|
||||
95: ShadeCapability(is_duolite=True),
|
||||
}
|
||||
|
||||
_DEFAULT_CAPABILITY: Final[ShadeCapability] = ShadeCapability()
|
||||
|
||||
|
||||
def get_shade_capabilities(type_id: int | None) -> ShadeCapability:
|
||||
"""Return shade capabilities for a given type_id."""
|
||||
if type_id is None:
|
||||
return _DEFAULT_CAPABILITY
|
||||
return SHADE_CAPABILITIES.get(type_id, _DEFAULT_CAPABILITY)
|
||||
|
||||
|
||||
OPEN_POSITION: Final[int] = 100
|
||||
CLOSED_POSITION: Final[int] = 0
|
||||
|
||||
@@ -95,26 +161,18 @@ class PowerViewBLE:
|
||||
|
||||
def __init__(self, ble_device: BLEDevice, home_key: bytes = b"") -> None:
|
||||
"""Initialize device API via Bluetooth."""
|
||||
self._ble_device: Final[BLEDevice] = ble_device
|
||||
self._ble_device: BLEDevice = ble_device
|
||||
self.name: Final[str] = self._ble_device.name or "unknown"
|
||||
self._seqcnt: int = 1
|
||||
self._client: BleakClient = BleakClient(
|
||||
self._ble_device,
|
||||
disconnected_callback=self._on_disconnect,
|
||||
services=[
|
||||
UUID_COV_SERVICE,
|
||||
# self.UUID_DEV_SERVICE,
|
||||
# self.UUID_BAT_SERVICE,
|
||||
],
|
||||
)
|
||||
self._client: BleakClient = BleakClient(self._ble_device)
|
||||
self._data_event = asyncio.Event()
|
||||
self._data: bytearray = bytearray()
|
||||
self._data: bytes = b""
|
||||
self._info: PVDeviceInfo = PVDeviceInfo()
|
||||
self._is_encrypted: bool = False
|
||||
self._cmd_lock: Final = asyncio.Lock()
|
||||
self._cmd_next: tuple[ShadeCmd, bytes]
|
||||
self._is_encrypted: bool = False
|
||||
self._cipher: Final[Cipher | None] = (
|
||||
Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16)))
|
||||
Cipher(algorithms.AES(home_key), modes.CTR(bytes(16)))
|
||||
if len(home_key) == 16
|
||||
else None
|
||||
)
|
||||
@@ -123,6 +181,10 @@ class PowerViewBLE:
|
||||
await self._data_event.wait()
|
||||
self._data_event.clear()
|
||||
|
||||
def set_ble_device(self, ble_device: BLEDevice) -> None:
|
||||
"""Update the BLE device reference (e.g. when proxy details change)."""
|
||||
self._ble_device = ble_device
|
||||
|
||||
@property
|
||||
def encrypted(self) -> bool:
|
||||
"""Return whether communication with this shade is encrypted."""
|
||||
@@ -132,6 +194,11 @@ class PowerViewBLE:
|
||||
def encrypted(self, value: bool) -> None:
|
||||
self._is_encrypted = value
|
||||
|
||||
@property
|
||||
def has_key(self) -> bool:
|
||||
"""Return True if a valid homekey was provided."""
|
||||
return self._cipher is not None
|
||||
|
||||
@property
|
||||
def info(self) -> PVDeviceInfo:
|
||||
"""Return device information, e.g. SW version."""
|
||||
@@ -180,72 +247,81 @@ class PowerViewBLE:
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def dec_manufacturer_data(data: bytearray) -> list[tuple[str, float]]:
|
||||
def dec_manufacturer_data(data: bytearray) -> dict[str, float | int | bool]:
|
||||
"""Decode manufacturer data from BLE advertisement V2."""
|
||||
if len(data) != 9:
|
||||
LOGGER.debug("not a V2 record!")
|
||||
return []
|
||||
pos: Final[int] = int.from_bytes(data[3:5], byteorder="little")
|
||||
return {}
|
||||
# data[3] lower 2 bits are status flags; pos is in bits 2-7 of data[3]
|
||||
# and bits 0-3 of data[4]. Read flags before extracting position so
|
||||
# the masking below doesn't accidentally overwrite them.
|
||||
flags: Final[int] = data[3] & 0x3
|
||||
# Mask pos2 bits (upper nibble of data[4]) out before forming the
|
||||
# 10-bit position value, otherwise a non-zero top-rail position on
|
||||
# TDBU shades contaminates the bottom-rail reading.
|
||||
pos: Final[int] = ((data[4] & 0x0F) << 6) | ((data[3] >> 2) & 0x3F)
|
||||
pos2: Final[int] = (int(data[5]) << 4) + (int(data[4]) >> 4)
|
||||
return [
|
||||
(ATTR_CURRENT_POSITION, ((pos >> 2) / 10)),
|
||||
("position2", pos2 >> 2),
|
||||
("position3", int(data[6])),
|
||||
(ATTR_CURRENT_TILT_POSITION, int(data[7])),
|
||||
("home_id", int.from_bytes(data[0:2], byteorder="little")),
|
||||
("type_id", int.from_bytes(data[2:3])),
|
||||
("is_opening", bool(pos & 0x3 == 0x2)),
|
||||
("is_closing", bool(pos & 0x3 == 0x1)),
|
||||
("battery_charging", bool(pos & 0x3 == 0x3)), # observed
|
||||
("battery_level", POWER_LEVELS[(data[8] >> 6)]), # cannot hit 4
|
||||
("resetMode", bool(data[8] & 0x1)),
|
||||
("resetClock", bool(data[8] & 0x2)),
|
||||
]
|
||||
return {
|
||||
ATTR_CURRENT_POSITION: pos / 10,
|
||||
"position2": pos2 >> 2,
|
||||
"position3": int(data[6]),
|
||||
ATTR_CURRENT_TILT_POSITION: int(data[7]),
|
||||
"home_id": int.from_bytes(data[0:2], byteorder="little"),
|
||||
"type_id": int(data[2]),
|
||||
"is_opening": bool(flags == 0x2),
|
||||
"is_closing": bool(flags == 0x1),
|
||||
"battery_charging": bool(flags == 0x3), # observed
|
||||
"battery_level": POWER_LEVELS[(data[8] >> 6)], # cannot hit 4
|
||||
"resetMode": bool(data[8] & 0x1),
|
||||
"resetClock": bool(data[8] & 0x2),
|
||||
}
|
||||
|
||||
# position cmd: uint16_t pos1, uint16_t pos2, uint16_t pos3, uint16_t tilt, uint8_t velocity
|
||||
async def set_position(
|
||||
self,
|
||||
pos1: int,
|
||||
pos2: int | None = None,
|
||||
pos3: int | None = None,
|
||||
tilt: int | None = None,
|
||||
pos2: int = 0x8000,
|
||||
pos3: int = 0x8000,
|
||||
tilt: int = 0x8000,
|
||||
velocity: int = 0x0,
|
||||
disconnect: bool = True,
|
||||
) -> None:
|
||||
"""Set position of device."""
|
||||
LOGGER.debug("%s setting position to %i, tilt %i", self.name, pos1, tilt)
|
||||
LOGGER.debug(
|
||||
"%s setting position to %i/%i/%i, tilt %i, velocity %s",
|
||||
self.name,
|
||||
pos1,
|
||||
pos2,
|
||||
pos3,
|
||||
tilt,
|
||||
velocity,
|
||||
)
|
||||
await self._cmd(
|
||||
(
|
||||
ShadeCmd.SET_POSITION,
|
||||
int.to_bytes(pos1, 2, byteorder="little")
|
||||
+ int.to_bytes(
|
||||
pos2 if pos2 is not None else 0x8000, 2, byteorder="little"
|
||||
)
|
||||
+ int.to_bytes(
|
||||
pos3 if pos3 is not None else 0x8000, 2, byteorder="little"
|
||||
)
|
||||
+ int.to_bytes(
|
||||
tilt if tilt is not None else 0x8000, 2, byteorder="little"
|
||||
)
|
||||
int.to_bytes(pos1 * 100, 2, byteorder="little")
|
||||
+ int.to_bytes(pos2, 2, byteorder="little")
|
||||
+ int.to_bytes(pos3, 2, byteorder="little")
|
||||
+ int.to_bytes(tilt, 2, byteorder="little")
|
||||
+ int.to_bytes(velocity, 1),
|
||||
),
|
||||
disconnect,
|
||||
)
|
||||
|
||||
async def open(self) -> None:
|
||||
async def open(self, velocity: int = 0x0) -> None:
|
||||
"""Fully open cover."""
|
||||
LOGGER.debug("%s open", self.name)
|
||||
await self.set_position(OPEN_POSITION, disconnect=False)
|
||||
await self.set_position(OPEN_POSITION, velocity=velocity, disconnect=False)
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop device movement."""
|
||||
LOGGER.debug("%s stop", self.name)
|
||||
await self._cmd((ShadeCmd.STOP, b""))
|
||||
|
||||
async def close(self) -> None:
|
||||
async def close(self, velocity: int = 0x0) -> None:
|
||||
"""Fully close cover."""
|
||||
LOGGER.debug("%s close", self.name)
|
||||
await self.set_position(CLOSED_POSITION, disconnect=False)
|
||||
await self.set_position(CLOSED_POSITION, velocity=velocity, disconnect=False)
|
||||
|
||||
# uint8_t scene#, uint8_t unknown
|
||||
# open: scene 2
|
||||
@@ -260,14 +336,15 @@ class PowerViewBLE:
|
||||
),
|
||||
)
|
||||
|
||||
def _verify_response(self, din: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
|
||||
async def identify(self, beeps: int = 0x3) -> None:
|
||||
"""Identify device."""
|
||||
LOGGER.debug("%s identify (%i)", self.name, beeps)
|
||||
await self._cmd((ShadeCmd.IDENTIFY, bytes([min(beeps, 0xFF)])))
|
||||
|
||||
def _verify_response(self, data: bytes, seq_nr: int, cmd: ShadeCmd) -> bool:
|
||||
"""Verify shade response data."""
|
||||
data: bytearray = din
|
||||
if self._cipher is not None and self._is_encrypted:
|
||||
dec: AEADDecryptionContext = self._cipher.decryptor()
|
||||
data = bytearray(dec.update(din) + dec.finalize())
|
||||
if len(data) < 4:
|
||||
LOGGER.error("Reponse message too short")
|
||||
LOGGER.error("Response message too short")
|
||||
return False
|
||||
if int.from_bytes(data[0:2], byteorder="little") != cmd.value & 0xFFEF:
|
||||
LOGGER.warning("Response to wrong command")
|
||||
@@ -323,10 +400,10 @@ class PowerViewBLE:
|
||||
|
||||
def _notification_handler(self, _sender, data: bytearray) -> None:
|
||||
LOGGER.debug("%s received BLE data: %s", self.name, data.hex(" "))
|
||||
self._data = data
|
||||
self._data = bytes(data)
|
||||
if self._cipher is not None and self._is_encrypted:
|
||||
dec: AEADDecryptionContext = self._cipher.decryptor()
|
||||
self._data = bytearray(dec.update(data) + dec.finalize())
|
||||
self._data = bytes(dec.update(bytes(data)) + dec.finalize())
|
||||
LOGGER.debug(
|
||||
"%s %s",
|
||||
"decoded data: ".rjust(19 + len(self.name)),
|
||||
@@ -350,6 +427,7 @@ class PowerViewBLE:
|
||||
self._ble_device,
|
||||
self.name,
|
||||
disconnected_callback=self._on_disconnect,
|
||||
ble_device_callback=lambda: self._ble_device,
|
||||
services=[
|
||||
UUID_COV_SERVICE,
|
||||
UUID_DEV_SERVICE,
|
||||
|
||||
@@ -13,7 +13,7 @@ from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.device_registry import format_mac
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
|
||||
from . import ConfigEntryType
|
||||
from . import ConfigEntryType, async_setup_shade_platform
|
||||
from .const import DOMAIN
|
||||
from .coordinator import PVCoordinator
|
||||
|
||||
@@ -26,21 +26,30 @@ BINARY_SENSOR_TYPES: list[BinarySensorEntityDescription] = [
|
||||
]
|
||||
|
||||
|
||||
def _add_entities(
|
||||
coordinator: PVCoordinator, async_add_entities: AddEntitiesCallback
|
||||
) -> None:
|
||||
"""Create binary sensor entities for a single shade coordinator."""
|
||||
async_add_entities(
|
||||
[
|
||||
PVBinarySensor(coordinator, descr, format_mac(coordinator.address))
|
||||
for descr in BINARY_SENSOR_TYPES
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
_hass: HomeAssistant,
|
||||
hass: HomeAssistant,
|
||||
config_entry: ConfigEntryType,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Add sensors for passed config_entry in Home Assistant."""
|
||||
|
||||
coord: PVCoordinator = config_entry.runtime_data
|
||||
for descr in BINARY_SENSOR_TYPES:
|
||||
async_add_entities(
|
||||
[PVBinarySensor(coord, descr, format_mac(config_entry.unique_id))]
|
||||
)
|
||||
async_setup_shade_platform(hass, config_entry, async_add_entities, _add_entities)
|
||||
|
||||
|
||||
class PVBinarySensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySensorEntity): # type: ignore[reportIncompatibleMethodOverride]
|
||||
class PVBinarySensor(
|
||||
PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySensorEntity
|
||||
): # type: ignore[reportIncompatibleMethodOverride]
|
||||
"""The generic PV binary sensor implementation."""
|
||||
|
||||
def __init__(
|
||||
@@ -49,7 +58,7 @@ class PVBinarySensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySen
|
||||
descr: BinarySensorEntityDescription,
|
||||
unique_id: str,
|
||||
) -> None:
|
||||
"""Intialize PV binary sensor."""
|
||||
"""Initialize PV binary sensor."""
|
||||
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
|
||||
self._attr_device_info = coord.device_info
|
||||
self._attr_has_entity_name = True
|
||||
|
||||
71
custom_components/hunterdouglas_powerview_ble/button.py
Normal file
71
custom_components/hunterdouglas_powerview_ble/button.py
Normal file
@@ -0,0 +1,71 @@
|
||||
"""Hunter Douglas Powerview cover."""
|
||||
|
||||
from typing import Final
|
||||
|
||||
from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
PassiveBluetoothCoordinatorEntity,
|
||||
)
|
||||
from homeassistant.components.button import (
|
||||
ButtonDeviceClass,
|
||||
ButtonEntity,
|
||||
ButtonEntityDescription,
|
||||
)
|
||||
from homeassistant.const import EntityCategory
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.device_registry import format_mac
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
|
||||
from . import ConfigEntryType, async_setup_shade_platform
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .coordinator import PVCoordinator
|
||||
|
||||
BUTTONS_SHADE: Final = [
|
||||
ButtonEntityDescription(
|
||||
key="identify",
|
||||
device_class=ButtonDeviceClass.IDENTIFY,
|
||||
entity_category=EntityCategory.DIAGNOSTIC,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def _add_entities(
|
||||
coordinator: PVCoordinator, async_add_entities: AddEntitiesCallback
|
||||
) -> None:
|
||||
"""Create button entities for a single shade coordinator."""
|
||||
async_add_entities(
|
||||
[PowerViewButton(coordinator, descr) for descr in BUTTONS_SHADE]
|
||||
)
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
config_entry: ConfigEntryType,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up the button platform."""
|
||||
async_setup_shade_platform(hass, config_entry, async_add_entities, _add_entities)
|
||||
|
||||
|
||||
class PowerViewButton(PassiveBluetoothCoordinatorEntity[PVCoordinator], ButtonEntity): # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Representation of a powerview shade."""
|
||||
|
||||
_attr_has_entity_name = True
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: PVCoordinator,
|
||||
description: ButtonEntityDescription,
|
||||
) -> None:
|
||||
"""Initialize the shade."""
|
||||
self.entity_description = description
|
||||
self._coord: PVCoordinator = coordinator
|
||||
self._attr_device_info = self._coord.device_info
|
||||
self._attr_unique_id = (
|
||||
f"{DOMAIN}_{format_mac(self._coord.address)}_{ButtonDeviceClass.IDENTIFY}"
|
||||
)
|
||||
super().__init__(coordinator)
|
||||
|
||||
async def async_press(self) -> None:
|
||||
"""Handle the button press."""
|
||||
LOGGER.debug("identify cover")
|
||||
await self._coord.api.identify()
|
||||
@@ -1,42 +1,253 @@
|
||||
"""Config flow for BLE Battery Management System integration."""
|
||||
"""Config flow for Hunter Douglas PowerView BLE integration."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
import hashlib
|
||||
import struct
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components.bluetooth import (
|
||||
BluetoothServiceInfoBleak,
|
||||
async_discovered_service_info,
|
||||
)
|
||||
from homeassistant.components.bluetooth import BluetoothServiceInfoBleak
|
||||
from homeassistant.config_entries import ConfigFlowResult
|
||||
from homeassistant.const import CONF_ADDRESS
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.helpers.selector import (
|
||||
SelectOptionDict,
|
||||
SelectSelector,
|
||||
SelectSelectorConfig,
|
||||
TextSelector,
|
||||
TextSelectorConfig,
|
||||
TextSelectorType,
|
||||
)
|
||||
|
||||
# from homeassistant.helpers.device_registry import format_mac
|
||||
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
|
||||
from .const import CONF_HOME_KEY, CONF_HUB_URL, DOMAIN, LOGGER, MFCT_ID
|
||||
|
||||
from .api import UUID_COV_SERVICE as UUID
|
||||
from .const import DOMAIN, LOGGER, MFCT_ID
|
||||
|
||||
def _hub_unique_id(home_key: str) -> str:
|
||||
"""Derive a stable unique ID for a hub entry from the home key."""
|
||||
if home_key:
|
||||
digest = hashlib.sha256(home_key.encode()).hexdigest()[:16]
|
||||
return f"pvhome_{digest}"
|
||||
return "pvhome_unencrypted"
|
||||
|
||||
|
||||
def _parse_key_response(ble_name: str, result: dict) -> bytes | None: # noqa: PLR0911
|
||||
"""Parse a shade exec response and return the 16-byte key, or None."""
|
||||
if result.get("err"):
|
||||
err_msg = (result.get("responses") or [{}])[0].get("errMsg", "unknown")
|
||||
LOGGER.warning(
|
||||
"Shade %s: hub BLE command failed (err=%s: %s)",
|
||||
ble_name,
|
||||
result["err"],
|
||||
err_msg,
|
||||
)
|
||||
return None
|
||||
|
||||
responses = result.get("responses", [])
|
||||
if len(responses) != 1 or "hex" not in responses[0]:
|
||||
LOGGER.warning(
|
||||
"Shade %s returned unexpected response structure: %s",
|
||||
ble_name,
|
||||
result,
|
||||
)
|
||||
return None
|
||||
|
||||
response_bytes = bytes.fromhex(responses[0]["hex"])
|
||||
if len(response_bytes) < 5:
|
||||
LOGGER.warning(
|
||||
"Shade %s response too short (%d bytes)", ble_name, len(response_bytes)
|
||||
)
|
||||
return None
|
||||
_s, _c, _q, length = struct.unpack("<BBBB", response_bytes[0:4])
|
||||
if len(response_bytes) != 4 + length:
|
||||
LOGGER.warning(
|
||||
"Shade %s frame length mismatch (header=%d, actual=%d)",
|
||||
ble_name,
|
||||
4 + length,
|
||||
len(response_bytes),
|
||||
)
|
||||
return None
|
||||
if response_bytes[4] != 0:
|
||||
LOGGER.warning(
|
||||
"Shade %s returned error status %d", ble_name, response_bytes[4]
|
||||
)
|
||||
return None
|
||||
key_data = response_bytes[5:]
|
||||
if len(key_data) != 16:
|
||||
LOGGER.warning(
|
||||
"Shade %s returned key of wrong length (%d, expected 16)",
|
||||
ble_name,
|
||||
len(key_data),
|
||||
)
|
||||
return None
|
||||
return key_data
|
||||
|
||||
|
||||
async def _fetch_key_from_hub(
|
||||
hass: HomeAssistant, hub_url: str
|
||||
) -> bytes:
|
||||
"""Fetch 16-byte homekey from a PowerView G3 hub.
|
||||
|
||||
Tries each shade on the hub until one returns a valid key.
|
||||
The key is network-wide so any reachable shade returns the same value.
|
||||
|
||||
The hub must establish a BLE connection to each shade before it can proxy
|
||||
the key request. On the first pass that connection is often not yet open,
|
||||
so the hub returns an error immediately. A second pass (after a short
|
||||
pause to let the hub complete its BLE connections) reliably succeeds.
|
||||
|
||||
Raises ValueError on protocol/key errors.
|
||||
Raises aiohttp.ClientError on network errors.
|
||||
Raises asyncio.TimeoutError on timeout.
|
||||
"""
|
||||
session = async_get_clientsession(hass)
|
||||
timeout = aiohttp.ClientTimeout(total=10)
|
||||
|
||||
async with session.get(f"{hub_url}/home/shades", timeout=timeout) as resp:
|
||||
resp.raise_for_status()
|
||||
shades = await resp.json(content_type=None)
|
||||
|
||||
if not shades:
|
||||
raise ValueError("No shades found on the hub")
|
||||
|
||||
# Sort by signal strength (strongest first) — a stronger signal means the
|
||||
# hub is more likely to have an active BLE connection to that shade.
|
||||
shades.sort(key=lambda s: s.get("signalStrength", -100), reverse=True)
|
||||
ble_names = [s["bleName"] for s in shades if s.get("bleName")]
|
||||
if not ble_names:
|
||||
raise ValueError("No BLE-capable shades found on the hub")
|
||||
|
||||
# GetShadeKey BLE request: sid=251, cid=18, seqId=1, data_len=0
|
||||
request_frame = struct.pack("<BBBB", 251, 18, 1, 0)
|
||||
|
||||
last_error: Exception = ValueError("No shades responded")
|
||||
for ble_name in ble_names:
|
||||
try:
|
||||
async with session.post(
|
||||
f"{hub_url}/home/shades/exec?shades={ble_name}",
|
||||
json={"hex": request_frame.hex()},
|
||||
timeout=timeout,
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
result = await resp.json(content_type=None)
|
||||
except (TimeoutError, aiohttp.ClientError) as ex:
|
||||
LOGGER.warning("Shade %s unreachable: %s", ble_name, ex)
|
||||
last_error = ex
|
||||
continue
|
||||
|
||||
key_data = _parse_key_response(ble_name, result)
|
||||
if key_data is not None:
|
||||
return key_data
|
||||
|
||||
raise ValueError(f"No reachable shade returned a valid key: {last_error}")
|
||||
|
||||
|
||||
_HOMEKEY_SCHEMA = vol.Schema(
|
||||
{
|
||||
vol.Required("key_method", default="hub"): SelectSelector(
|
||||
SelectSelectorConfig(
|
||||
options=[
|
||||
SelectOptionDict(
|
||||
value="hub",
|
||||
label="Fetch automatically from PowerView hub",
|
||||
),
|
||||
SelectOptionDict(
|
||||
value="manual",
|
||||
label="Enter key manually (32 hex characters)",
|
||||
),
|
||||
SelectOptionDict(
|
||||
value="skip",
|
||||
label="Skip (no key — controls disabled for encrypted shades)",
|
||||
),
|
||||
]
|
||||
)
|
||||
),
|
||||
vol.Optional(CONF_HUB_URL, default="http://powerview-g3.local"): TextSelector(
|
||||
TextSelectorConfig(type=TextSelectorType.URL)
|
||||
),
|
||||
vol.Optional("home_key", default=""): TextSelector(
|
||||
TextSelectorConfig(type=TextSelectorType.TEXT)
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a config flow for BT Battery Management System."""
|
||||
"""Handle a config flow for Hunter Douglas PowerView BLE."""
|
||||
|
||||
VERSION = 1
|
||||
MINOR_VERSION = 0
|
||||
|
||||
@dataclass
|
||||
class DiscoveredDevice:
|
||||
"""A discovered bluetooth device."""
|
||||
|
||||
name: str
|
||||
discovery_info: BluetoothServiceInfoBleak
|
||||
VERSION = 2
|
||||
MINOR_VERSION = 1
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize the config flow."""
|
||||
self._home_key: str = ""
|
||||
self._hub_url: str = ""
|
||||
|
||||
self._discovered_device: ConfigFlow.DiscoveredDevice | None = None
|
||||
self._discovered_devices: dict[str, ConfigFlow.DiscoveredDevice] = {}
|
||||
def _create_entry(self) -> ConfigFlowResult:
|
||||
"""Create the hub config entry."""
|
||||
data: dict[str, str] = {CONF_HOME_KEY: self._home_key}
|
||||
if self._hub_url:
|
||||
data[CONF_HUB_URL] = self._hub_url
|
||||
return self.async_create_entry(title="PowerView Home", data=data)
|
||||
|
||||
def _validate_manual_key(
|
||||
self, user_input: dict[str, Any], errors: dict[str, str]
|
||||
) -> bool:
|
||||
"""Validate a manually entered hex key.
|
||||
|
||||
Returns True on success, False on validation error.
|
||||
"""
|
||||
raw = user_input.get("home_key", "").strip()
|
||||
if "\\x" in raw:
|
||||
raw = raw.replace("\\x", "")
|
||||
if len(raw) != 32:
|
||||
errors["home_key"] = "invalid_key_length"
|
||||
return False
|
||||
try:
|
||||
bytes.fromhex(raw)
|
||||
except ValueError:
|
||||
errors["home_key"] = "invalid_key_format"
|
||||
return False
|
||||
self._home_key = raw.lower()
|
||||
return True
|
||||
|
||||
async def _validate_homekey_input(
|
||||
self, user_input: dict[str, Any], errors: dict[str, str]
|
||||
) -> bool:
|
||||
"""Parse and validate homekey user_input, populating self state.
|
||||
|
||||
Returns True on success, False on validation error (errors dict is populated).
|
||||
"""
|
||||
method = user_input.get("key_method", "skip")
|
||||
|
||||
if method == "skip":
|
||||
self._home_key = ""
|
||||
return True
|
||||
|
||||
if method == "manual":
|
||||
return self._validate_manual_key(user_input, errors)
|
||||
|
||||
if method != "hub":
|
||||
return False
|
||||
|
||||
hub_url = user_input.get(CONF_HUB_URL, "").rstrip("/")
|
||||
_HUB_ERROR_MAP: dict[type[Exception], str] = {
|
||||
aiohttp.ClientResponseError: "hub_http_error",
|
||||
aiohttp.ClientConnectionError: "hub_connection_error",
|
||||
TimeoutError: "hub_timeout",
|
||||
ValueError: "hub_protocol_error",
|
||||
}
|
||||
try:
|
||||
key = await _fetch_key_from_hub(self.hass, hub_url)
|
||||
except tuple(_HUB_ERROR_MAP) as ex:
|
||||
LOGGER.warning("Hub key fetch failed: %s", ex)
|
||||
errors[CONF_HUB_URL] = _HUB_ERROR_MAP[type(ex)]
|
||||
return False
|
||||
|
||||
self._home_key = key.hex()
|
||||
self._hub_url = hub_url
|
||||
return True
|
||||
|
||||
async def async_step_bluetooth(
|
||||
self, discovery_info: BluetoothServiceInfoBleak
|
||||
@@ -44,84 +255,59 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a flow initialized by Bluetooth discovery."""
|
||||
LOGGER.debug("Bluetooth device detected: %s", discovery_info)
|
||||
|
||||
await self.async_set_unique_id(discovery_info.address)
|
||||
# Derive a home-wide unique ID from the home_id embedded in the BLE
|
||||
# advertisement (bytes 0-1 of the manufacturer payload). All shades on
|
||||
# the same network share the same home_id, so HA deduplicates every
|
||||
# subsequent shade discovery into this single flow via
|
||||
# "already_in_progress" rather than spawning one notification per shade.
|
||||
mfr_data = bytearray(
|
||||
discovery_info.manufacturer_data.get(MFCT_ID, b"")
|
||||
)
|
||||
if len(mfr_data) >= 2:
|
||||
home_id = int.from_bytes(mfr_data[0:2], byteorder="little")
|
||||
unique_id = f"pvhome_{home_id}"
|
||||
else:
|
||||
unique_id = DOMAIN
|
||||
|
||||
await self.async_set_unique_id(unique_id)
|
||||
self._abort_if_unique_id_configured()
|
||||
|
||||
self._discovered_device = ConfigFlow.DiscoveredDevice(
|
||||
discovery_info.name, discovery_info
|
||||
)
|
||||
self.context["title_placeholders"] = {"name": self._discovered_device.name}
|
||||
return await self.async_step_bluetooth_confirm()
|
||||
# If a hub entry already exists (unique_id may differ), shades are
|
||||
# auto-discovered internally — nothing more for the user to do.
|
||||
for entry in self._async_current_entries():
|
||||
if entry.version >= 2:
|
||||
return self.async_abort(reason="already_configured")
|
||||
|
||||
async def async_step_bluetooth_confirm(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Confirm bluetooth device discovery."""
|
||||
assert self._discovered_device is not None
|
||||
LOGGER.debug("confirm step for %s", self._discovered_device.name)
|
||||
|
||||
if user_input is not None:
|
||||
return self.async_create_entry(
|
||||
title=self._discovered_device.name,
|
||||
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
|
||||
)
|
||||
|
||||
self._set_confirm_only()
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="bluetooth_confirm",
|
||||
description_placeholders={"name": self._discovered_device.name},
|
||||
)
|
||||
# No hub entry yet — redirect to user setup
|
||||
return await self.async_step_user()
|
||||
|
||||
async def async_step_user(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle the user step to pick discovered device."""
|
||||
"""Handle the user step — create a hub entry."""
|
||||
LOGGER.debug("user step")
|
||||
|
||||
if user_input is not None:
|
||||
address = user_input[CONF_ADDRESS]
|
||||
await self.async_set_unique_id(address, raise_on_progress=False)
|
||||
# Only one hub entry allowed (per key, but for simplicity one total)
|
||||
for entry in self._async_current_entries():
|
||||
if entry.version >= 2:
|
||||
return self.async_abort(reason="single_instance_allowed")
|
||||
|
||||
errors: dict[str, str] = {}
|
||||
|
||||
if user_input is not None and await self._validate_homekey_input(
|
||||
user_input, errors
|
||||
):
|
||||
unique_id = _hub_unique_id(self._home_key)
|
||||
await self.async_set_unique_id(unique_id, raise_on_progress=False)
|
||||
self._abort_if_unique_id_configured()
|
||||
self._discovered_device = self._discovered_devices[address]
|
||||
|
||||
self.context["title_placeholders"] = {"name": self._discovered_device.name}
|
||||
|
||||
return self.async_create_entry(
|
||||
title=self._discovered_device.name,
|
||||
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
|
||||
)
|
||||
|
||||
current_addresses = self._async_current_ids()
|
||||
for discovery_info in async_discovered_service_info(self.hass, False):
|
||||
address = discovery_info.address
|
||||
if address in current_addresses or address in self._discovered_devices:
|
||||
continue
|
||||
|
||||
if MFCT_ID not in discovery_info.manufacturer_data:
|
||||
continue
|
||||
|
||||
if UUID not in discovery_info.service_uuids:
|
||||
continue
|
||||
|
||||
self._discovered_devices[address] = ConfigFlow.DiscoveredDevice(
|
||||
discovery_info.name, discovery_info
|
||||
)
|
||||
|
||||
if not self._discovered_devices:
|
||||
return self.async_abort(reason="no_devices_found")
|
||||
|
||||
titles = []
|
||||
for address, discovery in self._discovered_devices.items():
|
||||
titles.append({"value": address, "label": discovery.name})
|
||||
return self._create_entry()
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_ADDRESS): SelectSelector(
|
||||
SelectSelectorConfig(options=titles)
|
||||
)
|
||||
}
|
||||
),
|
||||
data_schema=_HOMEKEY_SCHEMA,
|
||||
errors=errors,
|
||||
description_placeholders={
|
||||
"hub_url_example": "http://powerview-g3.local",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -3,25 +3,16 @@
|
||||
import logging
|
||||
from typing import Final
|
||||
|
||||
# from bleak.uuids import normalize_uuid_str
|
||||
|
||||
# from homeassistant.const import ( # noqa: F401
|
||||
# ATTR_BATTERY_CHARGING,
|
||||
# ATTR_BATTERY_LEVEL,
|
||||
# ATTR_TEMPERATURE,
|
||||
# ATTR_VOLTAGE,
|
||||
# )
|
||||
|
||||
|
||||
DOMAIN: Final[str] = "hunterdouglas_powerview_ble"
|
||||
LOGGER: Final = logging.getLogger(__package__)
|
||||
MFCT_ID: Final[int] = 2073
|
||||
TIMEOUT: Final[int] = 5
|
||||
|
||||
# put the key here, needs to be 16 bytes long, e.g.
|
||||
# HOME_KEY: Final[bytes] = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
|
||||
HOME_KEY: Final[bytes] = b""
|
||||
CONF_HOME_KEY: Final[str] = "home_key"
|
||||
CONF_HUB_URL: Final[str] = "hub_url"
|
||||
|
||||
# dispatcher signal for newly discovered shades (format with entry_id)
|
||||
SIGNAL_NEW_SHADE: Final[str] = f"{DOMAIN}_new_shade_{{entry_id}}"
|
||||
|
||||
# attributes (do not change)
|
||||
ATTR_RSSI: Final[str] = "rssi"
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
from typing import Any
|
||||
|
||||
from bleak.backends.device import BLEDevice
|
||||
|
||||
from homeassistant.components import bluetooth
|
||||
from homeassistant.components.bluetooth.const import DOMAIN as BLUETOOTH_DOMAIN
|
||||
from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
@@ -11,27 +12,36 @@ from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.helpers.device_registry import CONNECTION_BLUETOOTH, DeviceInfo
|
||||
|
||||
from .api import SHADE_TYPE, PowerViewBLE
|
||||
from .const import ATTR_RSSI, DOMAIN, HOME_KEY, LOGGER
|
||||
from .api import SHADE_TYPE, PowerViewBLE, ShadeCapability, get_shade_capabilities
|
||||
from .const import ATTR_RSSI, CONF_HOME_KEY, DOMAIN, LOGGER
|
||||
|
||||
|
||||
class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
|
||||
"""Update coordinator for a battery management system."""
|
||||
|
||||
def __init__(
|
||||
self, hass: HomeAssistant, ble_device: BLEDevice, data: dict[str, Any]
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
ble_device: BLEDevice,
|
||||
data: dict[str, Any],
|
||||
friendly_name: str | None = None,
|
||||
) -> None:
|
||||
"""Initialize BMS data coordinator."""
|
||||
assert ble_device.name is not None
|
||||
self._mac = ble_device.address
|
||||
self.api = PowerViewBLE(ble_device, HOME_KEY)
|
||||
self._friendly_name = friendly_name or ble_device.name
|
||||
home_key_hex: str = data.get(CONF_HOME_KEY, "")
|
||||
home_key: bytes = (
|
||||
bytes.fromhex(home_key_hex) if len(home_key_hex) == 32 else b""
|
||||
)
|
||||
self.api = PowerViewBLE(ble_device, home_key)
|
||||
self.data: dict[str, int | float | bool] = {}
|
||||
self._manuf_dat = data.get("manufacturer_data")
|
||||
self.dev_details: dict[str, str] = {}
|
||||
self.velocity: int = 0
|
||||
|
||||
LOGGER.debug(
|
||||
"Initializing coordinator for %s (%s)",
|
||||
ble_device.name,
|
||||
self._friendly_name,
|
||||
ble_device.address,
|
||||
)
|
||||
super().__init__(
|
||||
@@ -41,6 +51,19 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
|
||||
bluetooth.BluetoothScanningMode.ACTIVE,
|
||||
)
|
||||
|
||||
@property
|
||||
def type_id(self) -> int | None:
|
||||
"""Return the shade type ID from manufacturer data or live BLE data."""
|
||||
if self._manuf_dat:
|
||||
return int(bytes.fromhex(self._manuf_dat)[2])
|
||||
live = self.data.get("type_id")
|
||||
return int(live) if live is not None else None
|
||||
|
||||
@property
|
||||
def shade_capabilities(self) -> ShadeCapability:
|
||||
"""Return the shade capabilities based on type ID."""
|
||||
return get_shade_capabilities(self.type_id)
|
||||
|
||||
async def query_dev_info(self) -> None:
|
||||
"""Receive detailed information from device."""
|
||||
LOGGER.debug("%s: querying device info", self.name)
|
||||
@@ -49,24 +72,23 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
|
||||
@property
|
||||
def device_info(self) -> DeviceInfo:
|
||||
"""Return detailed device information for GUI."""
|
||||
LOGGER.debug("%s: device_info, %s", self.name, self.dev_details)
|
||||
LOGGER.debug("%s: device_info, %s", self._friendly_name, self.dev_details)
|
||||
return DeviceInfo(
|
||||
identifiers={
|
||||
(DOMAIN, self.name),
|
||||
(DOMAIN, self.address),
|
||||
(BLUETOOTH_DOMAIN, self.address),
|
||||
},
|
||||
connections={(CONNECTION_BLUETOOTH, self.address)},
|
||||
name=self.name,
|
||||
name=self._friendly_name,
|
||||
configuration_url=None,
|
||||
# properties used in GUI:
|
||||
manufacturer="Hunter Douglas",
|
||||
model=(
|
||||
str(SHADE_TYPE.get(int(bytes.fromhex(self._manuf_dat)[2]), "unknown"))
|
||||
if self._manuf_dat
|
||||
str(SHADE_TYPE.get(self.type_id, "unknown"))
|
||||
if self.type_id is not None
|
||||
else None
|
||||
),
|
||||
model_id=(
|
||||
str(bytes.fromhex(self._manuf_dat)[2]) if self._manuf_dat else None
|
||||
str(self.type_id) if self.type_id is not None else None
|
||||
),
|
||||
serial_number=self.dev_details.get("serial_nr"),
|
||||
sw_version=self.dev_details.get("sw_rev"),
|
||||
@@ -76,11 +98,11 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
|
||||
@property
|
||||
def device_present(self) -> bool:
|
||||
"""Check if a device is present."""
|
||||
return bluetooth.async_address_present(self.hass, self._mac, connectable=True)
|
||||
return bluetooth.async_address_present(self.hass, self.address, connectable=True)
|
||||
|
||||
def _async_stop(self) -> None:
|
||||
"""Shutdown coordinator and any connection."""
|
||||
LOGGER.debug("%s: shuting down BMS device", self.name)
|
||||
LOGGER.debug("%s: shutting down BMS device", self.name)
|
||||
self.hass.async_create_task(self.api.disconnect())
|
||||
super()._async_stop()
|
||||
|
||||
@@ -92,18 +114,19 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
|
||||
) -> None:
|
||||
"""Handle a Bluetooth event."""
|
||||
|
||||
# if not self.dev_details:
|
||||
# self.hass.async_create_task(self._get_device_info())
|
||||
|
||||
LOGGER.debug("BLE event %s: %s", change, service_info.manufacturer_data)
|
||||
self.data = {ATTR_RSSI: service_info.rssi}
|
||||
self.api.set_ble_device(service_info.device)
|
||||
new_data: dict[str, int | float | bool] = {ATTR_RSSI: service_info.rssi}
|
||||
if change == bluetooth.BluetoothChange.ADVERTISEMENT:
|
||||
self.data.update(
|
||||
new_data.update(
|
||||
self.api.dec_manufacturer_data(
|
||||
bytearray(service_info.manufacturer_data.get(2073, b""))
|
||||
)
|
||||
)
|
||||
self.api.encrypted = bool(self.data.get("home_id"))
|
||||
self.api.encrypted = bool(new_data.get("home_id"))
|
||||
|
||||
if new_data == self.data:
|
||||
return
|
||||
self.data = new_data
|
||||
LOGGER.debug("data sample %s", self.data)
|
||||
super()._async_handle_bluetooth_event(service_info, change)
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
from typing import Any, Final
|
||||
|
||||
from bleak.exc import BleakError
|
||||
|
||||
from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
PassiveBluetoothCoordinatorEntity,
|
||||
)
|
||||
@@ -15,29 +16,43 @@ from homeassistant.components.cover import (
|
||||
CoverEntity,
|
||||
CoverEntityFeature,
|
||||
)
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.device_registry import DeviceInfo, format_mac
|
||||
from homeassistant.helpers.device_registry import format_mac
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
|
||||
from . import ConfigEntryType, async_setup_shade_platform
|
||||
from .api import CLOSED_POSITION, OPEN_POSITION
|
||||
from .const import DOMAIN, HOME_KEY, LOGGER
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .coordinator import PVCoordinator
|
||||
|
||||
|
||||
def _add_entities(
|
||||
coordinator: PVCoordinator, async_add_entities: AddEntitiesCallback
|
||||
) -> None:
|
||||
"""Create cover entities for a single shade coordinator."""
|
||||
caps = coordinator.shade_capabilities
|
||||
|
||||
if caps.tilt_only:
|
||||
entities: list[PowerViewCover] = [PowerViewCoverTiltOnly(coordinator)]
|
||||
elif caps.is_tilt_on_closed:
|
||||
entities = [PowerViewCoverTiltOnClosed(coordinator)]
|
||||
elif caps.has_tilt:
|
||||
entities = [PowerViewCoverTilt(coordinator)]
|
||||
elif caps.is_top_down:
|
||||
entities = [PowerViewCoverTopDown(coordinator)]
|
||||
else:
|
||||
entities = [PowerViewCover(coordinator)]
|
||||
|
||||
async_add_entities(entities)
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
_hass: HomeAssistant,
|
||||
config_entry: ConfigEntry,
|
||||
hass: HomeAssistant,
|
||||
config_entry: ConfigEntryType,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up the demo cover platform."""
|
||||
|
||||
coordinator: PVCoordinator = config_entry.runtime_data
|
||||
async_add_entities(
|
||||
[PowerViewCoverTilt(coordinator)]
|
||||
if coordinator.dev_details.get("model") in ["51", "62"]
|
||||
else [PowerViewCover(coordinator)]
|
||||
)
|
||||
"""Set up the cover platform."""
|
||||
async_setup_shade_platform(hass, config_entry, async_add_entities, _add_entities)
|
||||
|
||||
|
||||
class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEntity): # type: ignore[reportIncompatibleVariableOverride]
|
||||
@@ -58,7 +73,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
) -> None:
|
||||
"""Initialize the shade."""
|
||||
LOGGER.debug("%s: init() PowerViewCover", coordinator.name)
|
||||
self._attr_name = CoverDeviceClass.SHADE
|
||||
self._attr_name = None
|
||||
self._coord: PVCoordinator = coordinator
|
||||
self._attr_device_info = self._coord.device_info
|
||||
self._target_position: int | None = round(
|
||||
@@ -69,11 +84,6 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
)
|
||||
super().__init__(coordinator)
|
||||
|
||||
@property
|
||||
def device_info(self) -> DeviceInfo: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return the device_info of the device."""
|
||||
return self._coord.device_info
|
||||
|
||||
@property
|
||||
def is_opening(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return if the cover is opening or not."""
|
||||
@@ -102,9 +112,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
@property
|
||||
def supported_features(self) -> CoverEntityFeature: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Flag supported features, disable control if encryption is needed."""
|
||||
if (
|
||||
self._coord.data.get("home_id") and len(HOME_KEY) != 16
|
||||
) or self._coord.data.get("battery_charging"):
|
||||
if self._coord.data.get("home_id") and not self._coord.api.has_key:
|
||||
return CoverEntityFeature(0)
|
||||
|
||||
return super().supported_features
|
||||
@@ -129,7 +137,10 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
return
|
||||
self._target_position = round(target_position)
|
||||
try:
|
||||
await self._coord.api.set_position(round(target_position))
|
||||
await self._coord.api.set_position(
|
||||
round(target_position),
|
||||
velocity=self._coord.velocity,
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error(
|
||||
@@ -149,7 +160,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
return
|
||||
try:
|
||||
self._target_position = OPEN_POSITION
|
||||
await self._coord.api.open()
|
||||
await self._coord.api.open(velocity=self._coord.velocity)
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error("Failed to open cover '%s': %s", self.name, err)
|
||||
@@ -162,7 +173,7 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
return
|
||||
try:
|
||||
self._target_position = CLOSED_POSITION
|
||||
await self._coord.api.close()
|
||||
await self._coord.api.close(velocity=self._coord.velocity)
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error("Failed to close cover '%s': %s", self.name, err)
|
||||
@@ -197,6 +208,7 @@ class PowerViewCoverTilt(PowerViewCover):
|
||||
self,
|
||||
coordinator: PVCoordinator,
|
||||
) -> None:
|
||||
"""Initialize the shade with tilt."""
|
||||
LOGGER.debug("%s: init() PowerViewCoverTilt", coordinator.name)
|
||||
super().__init__(coordinator)
|
||||
|
||||
@@ -222,7 +234,9 @@ class PowerViewCoverTilt(PowerViewCover):
|
||||
|
||||
try:
|
||||
await self._coord.api.set_position(
|
||||
self.current_cover_position, tilt=target_position
|
||||
self.current_cover_position,
|
||||
tilt=target_position,
|
||||
velocity=self._coord.velocity,
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
@@ -235,7 +249,7 @@ class PowerViewCoverTilt(PowerViewCover):
|
||||
|
||||
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
|
||||
"""Stop the cover."""
|
||||
await self.async_stop_cover(kwargs=kwargs)
|
||||
await self.async_stop_cover(**kwargs)
|
||||
|
||||
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
|
||||
"""Open the cover tilt."""
|
||||
@@ -248,3 +262,140 @@ class PowerViewCoverTilt(PowerViewCover):
|
||||
LOGGER.debug("close cover tilt")
|
||||
_kwargs = {**kwargs, ATTR_TILT_POSITION: CLOSED_POSITION}
|
||||
await self.async_set_cover_tilt_position(**_kwargs)
|
||||
|
||||
|
||||
class PowerViewCoverTiltOnClosed(PowerViewCoverTilt):
|
||||
"""Representation of a PowerView shade whose tilt is only available when closed.
|
||||
|
||||
Examples: Bottom Up 90° (type 18), Twist (type 44).
|
||||
|
||||
If a tilt command arrives while the shade is open, the shade is closed first
|
||||
so the tilt mechanism is engaged before the command is sent.
|
||||
"""
|
||||
|
||||
def __init__(self, coordinator: PVCoordinator) -> None:
|
||||
"""Initialize the shade."""
|
||||
LOGGER.debug("%s: init() PowerViewCoverTiltOnClosed", coordinator.name)
|
||||
super().__init__(coordinator)
|
||||
|
||||
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
|
||||
"""Move the tilt to a specific position, closing first if needed."""
|
||||
if self.current_cover_position != CLOSED_POSITION:
|
||||
LOGGER.debug("tilt-on-closed: closing shade before tilting")
|
||||
try:
|
||||
self._target_position = CLOSED_POSITION
|
||||
await self._coord.api.close(velocity=self._coord.velocity)
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error("Failed to close cover '%s' before tilt: %s", self.name, err)
|
||||
self._reset_target_position()
|
||||
return
|
||||
await super().async_set_cover_tilt_position(**kwargs)
|
||||
|
||||
|
||||
class PowerViewCoverTopDown(PowerViewCover):
|
||||
"""Representation of a top-down PowerView shade.
|
||||
|
||||
The device position axis is inverted: device 0 = open (fabric retracted),
|
||||
device 100 = closed (fabric fully extended). We translate at the boundary
|
||||
so HA's standard 0=closed / 100=open convention is preserved.
|
||||
"""
|
||||
|
||||
@property
|
||||
def current_cover_position(self) -> int | None: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return current position, inverting the device axis."""
|
||||
pos: Final = self._coord.data.get(ATTR_CURRENT_POSITION)
|
||||
return OPEN_POSITION - round(pos) if pos is not None else None
|
||||
|
||||
async def async_set_cover_position(self, **kwargs: Any) -> None:
|
||||
"""Move the cover to a specific position, inverting for the device."""
|
||||
target_position: Final = kwargs.get(ATTR_POSITION)
|
||||
if target_position is not None:
|
||||
inverted = OPEN_POSITION - round(target_position)
|
||||
LOGGER.debug("set top-down cover to position %f (device %i)", target_position, inverted)
|
||||
if self.current_cover_position == round(target_position) and not (
|
||||
self.is_closing or self.is_opening
|
||||
):
|
||||
return
|
||||
self._target_position = round(target_position)
|
||||
try:
|
||||
await self._coord.api.set_position(
|
||||
inverted,
|
||||
velocity=self._coord.velocity,
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error(
|
||||
"Failed to move cover '%s' to %f%%: %s",
|
||||
self.name,
|
||||
target_position,
|
||||
err,
|
||||
)
|
||||
|
||||
async def async_open_cover(self, **kwargs: Any) -> None:
|
||||
"""Open the cover (send device position 0)."""
|
||||
LOGGER.debug("open top-down cover")
|
||||
if self.current_cover_position == OPEN_POSITION:
|
||||
return
|
||||
try:
|
||||
self._target_position = OPEN_POSITION
|
||||
await self._coord.api.set_position(CLOSED_POSITION, velocity=self._coord.velocity)
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error("Failed to open cover '%s': %s", self.name, err)
|
||||
self._reset_target_position()
|
||||
|
||||
async def async_close_cover(self, **kwargs: Any) -> None:
|
||||
"""Close the cover (send device position 100)."""
|
||||
LOGGER.debug("close top-down cover")
|
||||
if self.current_cover_position == CLOSED_POSITION:
|
||||
return
|
||||
try:
|
||||
self._target_position = CLOSED_POSITION
|
||||
await self._coord.api.set_position(OPEN_POSITION, velocity=self._coord.velocity)
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error("Failed to close cover '%s': %s", self.name, err)
|
||||
self._reset_target_position()
|
||||
|
||||
|
||||
class PowerViewCoverTiltOnly(PowerViewCoverTilt):
|
||||
"""Representation of a PowerView shade with additional tilt functionality."""
|
||||
|
||||
OPENCLOSED_THRESHOLD = 5
|
||||
|
||||
_attr_device_class = CoverDeviceClass.BLIND
|
||||
_attr_supported_features = (
|
||||
CoverEntityFeature.OPEN_TILT
|
||||
| CoverEntityFeature.CLOSE_TILT
|
||||
| CoverEntityFeature.STOP_TILT
|
||||
| CoverEntityFeature.SET_TILT_POSITION
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: PVCoordinator,
|
||||
) -> None:
|
||||
"""Initialize the shade with tilt only."""
|
||||
LOGGER.debug("%s: init() PowerViewCoverTiltOnly", coordinator.name)
|
||||
super().__init__(coordinator)
|
||||
|
||||
@property
|
||||
def is_opening(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return if the cover is opening or not."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def is_closing(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return if the cover is closing or not."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def is_closed(self) -> bool: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return if the cover is closed."""
|
||||
return isinstance(self.current_cover_tilt_position, int) and (
|
||||
self.current_cover_tilt_position
|
||||
>= OPEN_POSITION - PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
|
||||
or self.current_cover_tilt_position
|
||||
<= CLOSED_POSITION + PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
|
||||
)
|
||||
|
||||
@@ -11,10 +11,10 @@
|
||||
"config_flow": true,
|
||||
"dependencies": ["bluetooth_adapters"],
|
||||
"documentation": "https://github.com/patman15/hdpv_ble",
|
||||
"integration_type": "device",
|
||||
"integration_type": "hub",
|
||||
"iot_class": "local_polling",
|
||||
"issue_tracker": "https://github.com/patman15/hdpv_ble/issues",
|
||||
"loggers": ["hunterdouglas_powerview_ble"],
|
||||
"requirements": ["cryptography>=43.0.0"],
|
||||
"version": "0.22"
|
||||
"version": "0.24"
|
||||
}
|
||||
|
||||
74
custom_components/hunterdouglas_powerview_ble/number.py
Normal file
74
custom_components/hunterdouglas_powerview_ble/number.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""Hunter Douglas PowerView velocity control."""
|
||||
|
||||
from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
PassiveBluetoothCoordinatorEntity,
|
||||
)
|
||||
from homeassistant.components.number import NumberMode, RestoreNumber
|
||||
from homeassistant.const import EntityCategory
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.device_registry import format_mac
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
|
||||
from . import ConfigEntryType, async_setup_shade_platform
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .coordinator import PVCoordinator
|
||||
|
||||
|
||||
def _add_entities(
|
||||
coordinator: PVCoordinator, async_add_entities: AddEntitiesCallback
|
||||
) -> None:
|
||||
"""Create velocity number entity for a single shade coordinator."""
|
||||
async_add_entities([PowerViewVelocity(coordinator)])
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
config_entry: ConfigEntryType,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up the velocity number entity."""
|
||||
async_setup_shade_platform(hass, config_entry, async_add_entities, _add_entities)
|
||||
|
||||
|
||||
class PowerViewVelocity(
|
||||
PassiveBluetoothCoordinatorEntity[PVCoordinator], RestoreNumber
|
||||
): # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Number entity to control shade movement velocity."""
|
||||
|
||||
_attr_has_entity_name = True
|
||||
_attr_name = "Velocity"
|
||||
_attr_icon = "mdi:speedometer"
|
||||
_attr_mode = NumberMode.SLIDER
|
||||
_attr_native_min_value = 0
|
||||
_attr_native_max_value = 100
|
||||
_attr_native_step = 1
|
||||
_attr_entity_category = EntityCategory.CONFIG
|
||||
|
||||
def __init__(self, coordinator: PVCoordinator) -> None:
|
||||
"""Initialize the velocity entity."""
|
||||
self._coord = coordinator
|
||||
self._attr_device_info = self._coord.device_info
|
||||
self._attr_unique_id = (
|
||||
f"{DOMAIN}_{format_mac(self._coord.address)}_velocity"
|
||||
)
|
||||
super().__init__(coordinator)
|
||||
|
||||
@property
|
||||
def native_value(self) -> int:
|
||||
"""Return the current velocity value."""
|
||||
return self._coord.velocity
|
||||
|
||||
async def async_added_to_hass(self) -> None:
|
||||
"""Restore last known velocity on startup."""
|
||||
await super().async_added_to_hass()
|
||||
last_data = await self.async_get_last_number_data()
|
||||
if last_data and last_data.native_value is not None:
|
||||
self._coord.velocity = int(last_data.native_value)
|
||||
LOGGER.debug(
|
||||
"%s: restored velocity to %s", self._coord.name, self._coord.velocity
|
||||
)
|
||||
|
||||
async def async_set_native_value(self, value: float) -> None:
|
||||
"""Set the velocity value."""
|
||||
self._coord.velocity = int(value)
|
||||
self.async_write_ha_state()
|
||||
@@ -3,14 +3,8 @@
|
||||
from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
PassiveBluetoothCoordinatorEntity,
|
||||
)
|
||||
from homeassistant.components.sensor import (
|
||||
SensorEntity,
|
||||
SensorEntityDescription,
|
||||
)
|
||||
from homeassistant.components.sensor.const import (
|
||||
SensorDeviceClass,
|
||||
SensorStateClass,
|
||||
)
|
||||
from homeassistant.components.sensor import SensorEntity, SensorEntityDescription
|
||||
from homeassistant.components.sensor.const import SensorDeviceClass, SensorStateClass
|
||||
from homeassistant.const import (
|
||||
ATTR_BATTERY_LEVEL,
|
||||
PERCENTAGE,
|
||||
@@ -21,7 +15,7 @@ from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.device_registry import format_mac
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
|
||||
from . import ConfigEntryType
|
||||
from . import ConfigEntryType, async_setup_shade_platform
|
||||
from .const import ATTR_RSSI, DOMAIN
|
||||
from .coordinator import PVCoordinator
|
||||
|
||||
@@ -45,18 +39,25 @@ SENSOR_TYPES: list[SensorEntityDescription] = [
|
||||
]
|
||||
|
||||
|
||||
def _add_entities(
|
||||
coordinator: PVCoordinator, async_add_entities: AddEntitiesCallback
|
||||
) -> None:
|
||||
"""Create sensor entities for a single shade coordinator."""
|
||||
async_add_entities(
|
||||
[
|
||||
PVSensor(coordinator, descr, format_mac(coordinator.address))
|
||||
for descr in SENSOR_TYPES
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
_hass: HomeAssistant,
|
||||
hass: HomeAssistant,
|
||||
config_entry: ConfigEntryType,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Add sensors for passed config_entry in Home Assistant."""
|
||||
|
||||
pv_dev: PVCoordinator = config_entry.runtime_data
|
||||
for descr in SENSOR_TYPES:
|
||||
async_add_entities(
|
||||
[PVSensor(pv_dev, descr, format_mac(config_entry.unique_id))]
|
||||
)
|
||||
async_setup_shade_platform(hass, config_entry, async_add_entities, _add_entities)
|
||||
|
||||
|
||||
class PVSensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], SensorEntity): # type: ignore[reportIncompatibleMethodOverride]
|
||||
@@ -67,7 +68,7 @@ class PVSensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], SensorEntity):
|
||||
def __init__(
|
||||
self, pv_dev: PVCoordinator, descr: SensorEntityDescription, unique_id: str
|
||||
) -> None:
|
||||
"""Intitialize the BMS sensor."""
|
||||
"""Initialize the BMS sensor."""
|
||||
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
|
||||
self._attr_device_info = pv_dev.device_info
|
||||
self.entity_description = descr
|
||||
|
||||
@@ -1,15 +1,32 @@
|
||||
{
|
||||
"config": {
|
||||
"flow_title": "Setup {name}",
|
||||
"flow_title": "PowerView Home Setup",
|
||||
"step": {
|
||||
"bluetooth_confirm": {
|
||||
"description": "[%key:component::bluetooth::config::step::bluetooth_confirm::description%]"
|
||||
"user": {
|
||||
"title": "Set up PowerView Home",
|
||||
"description": "Configure your PowerView encryption key. All shades on your network will be discovered automatically via Bluetooth.",
|
||||
"data": {
|
||||
"key_method": "Key source",
|
||||
"hub_url": "PowerView hub URL",
|
||||
"home_key": "HomeKey (32 hex characters or \\xNN format)"
|
||||
},
|
||||
"data_description": {
|
||||
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
|
||||
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
|
||||
}
|
||||
}
|
||||
},
|
||||
"error": {
|
||||
"invalid_key_format": "HomeKey must be a valid hexadecimal string",
|
||||
"invalid_key_length": "HomeKey must be exactly 32 hex characters (16 bytes)",
|
||||
"hub_connection_error": "Cannot connect to the PowerView hub",
|
||||
"hub_http_error": "Hub returned an HTTP error",
|
||||
"hub_timeout": "Connection to hub timed out",
|
||||
"hub_protocol_error": "Hub returned an unexpected response"
|
||||
},
|
||||
"abort": {
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
|
||||
"no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]",
|
||||
"not_supported": "Device not supported"
|
||||
"single_instance_allowed": "[%key:common::config_flow::abort::single_instance_allowed%]"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,32 @@
|
||||
{
|
||||
"config": {
|
||||
"flow_title": "PowerView Home Setup",
|
||||
"step": {
|
||||
"user": {
|
||||
"title": "Set up PowerView Home",
|
||||
"description": "Configure your PowerView encryption key. All shades on your network will be discovered automatically via Bluetooth.",
|
||||
"data": {
|
||||
"key_method": "Key source",
|
||||
"hub_url": "PowerView hub URL",
|
||||
"home_key": "HomeKey (32 hex characters or \\xNN format)"
|
||||
},
|
||||
"data_description": {
|
||||
"hub_url": "Base URL of your PowerView G3 hub, e.g. {hub_url_example}",
|
||||
"home_key": "32-character hex string (e.g. 0102030405060708090a0b0c0d0e0f10) or \\x escaped format (e.g. \\x01\\x02...)"
|
||||
}
|
||||
}
|
||||
},
|
||||
"error": {
|
||||
"invalid_key_format": "HomeKey must be a valid hexadecimal string",
|
||||
"invalid_key_length": "HomeKey must be exactly 32 hex characters (16 bytes)",
|
||||
"hub_connection_error": "Cannot connect to the PowerView hub",
|
||||
"hub_http_error": "Hub returned an HTTP error",
|
||||
"hub_timeout": "Connection to hub timed out",
|
||||
"hub_protocol_error": "Hub returned an unexpected response"
|
||||
},
|
||||
"abort": {
|
||||
"already_configured": "Device is already configured",
|
||||
"no_devices_found": "No devices found on the network",
|
||||
"not_supported": "Device not supported"
|
||||
},
|
||||
"flow_title": "Setup {name}",
|
||||
"step": {
|
||||
"bluetooth_confirm": {
|
||||
"description": "Do you want to set up {name}?"
|
||||
}
|
||||
"single_instance_allowed": "Already configured. Only a single configuration possible."
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,12 @@
|
||||
* Emulate a Hunter Douglas PowerView cover device using ESP32
|
||||
* used e.g. to gain the home_key from an existing installation via BLE
|
||||
*
|
||||
* REQUIRES:
|
||||
* - ESP32 Board Definitions 3.0.x (tested on 3.0.7)
|
||||
* - WolfSSL 5.7.x (tested on 5.7.6)
|
||||
* - Phone Region: Potentially an alternative region depending on the app response
|
||||
* - e.g. To add Parkland shades in the US, phone region set to the UK temporarily
|
||||
*
|
||||
* TODO:
|
||||
* - cleanup code
|
||||
* - think about emulating a remote
|
||||
@@ -11,9 +17,13 @@
|
||||
*/
|
||||
|
||||
#define NAME "myPVcover"
|
||||
#define FW_VERSION "391"
|
||||
#define SERIAL_NR "01234567890ABCDEF"
|
||||
|
||||
const uint16_t SW_VERSION = 391;
|
||||
const char *SERIAL_NR = "01234567890ABCDEF";
|
||||
const uint16_t TYP_ID = 42; // 62
|
||||
const uint16_t MODEL_ID = 224;
|
||||
const uint16_t FW_REVISION = 27;
|
||||
const uint32_t HW_REVISION = 171103;
|
||||
const uint8_t BATTERY_LEVEL = 42;
|
||||
|
||||
#include <BLEDevice.h>
|
||||
#include <BLEServer.h>
|
||||
@@ -40,7 +50,11 @@ int devId = INVALID_DEVID; //if not using async INVALID_DEVID is default
|
||||
|
||||
#define DEV_SERVICE_UUID BLEUUID("180A")
|
||||
#define SER_CHAR_UUID BLEUUID("2A25")
|
||||
#define SWC_CHAR_UUID BLEUUID("2A28")
|
||||
#define MAN_CHAR_UUID BLEUUID("2A29")
|
||||
#define MOD_CHAR_UUID BLEUUID("2A24")
|
||||
#define FWR_CHAR_UUID BLEUUID("2A26")
|
||||
#define HWR_CHAR_UUID BLEUUID("2A27")
|
||||
#define SWR_CHAR_UUID BLEUUID("2A28")
|
||||
|
||||
#define BAT_SERVICE_UUID BLEUUID("180F")
|
||||
#define BAT_CHAR_UUID BLEUUID("2A19")
|
||||
@@ -69,7 +83,7 @@ struct notification {
|
||||
};
|
||||
|
||||
BLECharacteristic *pCharacteristic_cover, *pCharacteristic_fw, *pCharacteristic_unknown, *pCharacteristic_bat;
|
||||
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser;
|
||||
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser, *pCharacteristic_man, *pCharacteristic_mod, *pCharacteristic_fwr, *pCharacteristic_hwr;
|
||||
BLEServer *pServer = NULL;
|
||||
bool deviceConnected = false;
|
||||
bool oldDeviceConnected = false;
|
||||
@@ -158,12 +172,12 @@ void decode(BLECharacteristic *pChar) {
|
||||
memcpy((void *)&msg, data_dec, 4);
|
||||
Serial.printf("\t message: SRV: %02x, CMD %02x, SEQ %i, LEN %i\n", msg.serviceID, msg.cmdID, msg.sequence, msg.data_len);
|
||||
|
||||
// sepecial responses (static data!)
|
||||
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
|
||||
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x2a, 0xe0, 0x08 }; // HW diagnostics
|
||||
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
|
||||
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
|
||||
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
|
||||
// special responses (static data!)
|
||||
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
|
||||
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, (uint8_t)(SW_VERSION & 0xFF), (uint8_t)(SW_VERSION >> 8), 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, TYP_ID, MODEL_ID, 0x08 }; // HW diagnostics
|
||||
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
|
||||
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
|
||||
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
|
||||
|
||||
Serial.print("\t\t");
|
||||
switch ((msg.serviceID << 8) | msg.cmdID) {
|
||||
@@ -179,7 +193,7 @@ void decode(BLECharacteristic *pChar) {
|
||||
break;
|
||||
case 0xF711:
|
||||
// identify
|
||||
Serial.printf("identify: %i\n", data_dec[4]);
|
||||
Serial.printf("identify: %i times\n", data_dec[4]);
|
||||
resp_size = set_response(&response, (const message *)data_dec);
|
||||
break;
|
||||
case 0xF7B8:
|
||||
@@ -208,8 +222,8 @@ void decode(BLECharacteristic *pChar) {
|
||||
case 0xFB02:
|
||||
// set shade key
|
||||
Serial.print("set shade key: ");
|
||||
print_hex(&data_raw[4], data_len - 4, "\\x");
|
||||
// set resonse before key, to acknowledge unencrypted
|
||||
print_hex(&data_raw[4], data_len - 4, "\\x", "");
|
||||
// set response before key, to acknowledge unencrypted
|
||||
resp_size = set_response(&response, (const message *)data_dec);
|
||||
if (msg.data_len == 16) {
|
||||
memcpy(home_key, &data_raw[4], 16);
|
||||
@@ -251,7 +265,9 @@ void decode(BLECharacteristic *pChar) {
|
||||
resp_size = set_response(&response, (const message *)data_dec);
|
||||
break;
|
||||
default:
|
||||
Serial.println(F("*********************************** unknown message"));
|
||||
Serial.println(F("*********************************** unknown message (try ACK)"));
|
||||
resp_size = set_response(&response, (const message *)data_dec);
|
||||
break;
|
||||
}
|
||||
if (resp_size) {
|
||||
pChar->setValue((uint8_t *)&response, resp_size);
|
||||
@@ -291,7 +307,7 @@ class coverCallbacks : public BLECharacteristicCallbacks {
|
||||
void onNotify(BLECharacteristic *pCharacteristic) {
|
||||
Serial.printf("Cover onNotify() %s\n", pCharacteristic->toString().c_str());
|
||||
}
|
||||
|
||||
|
||||
void onStatus(BLECharacteristic *pCharacteristic, Status s, uint32_t code) {
|
||||
Serial.printf("Cover onStatus() %s: %s\n", BLEstate[s], pCharacteristic->toString().c_str());
|
||||
}
|
||||
@@ -341,6 +357,7 @@ class genericCallbacks : public BLECharacteristicCallbacks {
|
||||
|
||||
void setup() {
|
||||
Serial.begin(115200);
|
||||
delay(1000); // wait for terminal to be ready
|
||||
Serial.println(NAME " initializing ...");
|
||||
|
||||
BLEDevice::init(NAME);
|
||||
@@ -373,8 +390,7 @@ pDesc1->setValue("cover");*/
|
||||
BAT_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_bat->setCallbacks(new batteryCallbacks());
|
||||
uint8_t battery_level = 42;
|
||||
pCharacteristic_bat->setValue(&battery_level, 1);
|
||||
pCharacteristic_bat->setValue((uint8_t *)&BATTERY_LEVEL, 1);
|
||||
pCharacteristic_bat->addDescriptor(new BLE2902());
|
||||
|
||||
BLEService *pFWService = pServer->createService(FW_SERVICE_UUID);
|
||||
@@ -390,9 +406,9 @@ pDesc1->setValue("cover");*/
|
||||
|
||||
BLEService *pDEVService = pServer->createService(DEV_SERVICE_UUID);
|
||||
pCharacteristic_dev = pDEVService->createCharacteristic(
|
||||
SWC_CHAR_UUID,
|
||||
SWR_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_dev->setValue(FW_VERSION);
|
||||
pCharacteristic_dev->setValue(String(SW_VERSION));
|
||||
pCharacteristic_dev->setCallbacks(new genericCallbacks());
|
||||
pCharacteristic_ser = pDEVService->createCharacteristic(
|
||||
SER_CHAR_UUID,
|
||||
@@ -400,6 +416,28 @@ pDesc1->setValue("cover");*/
|
||||
pCharacteristic_ser->setValue(SERIAL_NR);
|
||||
pCharacteristic_ser->setCallbacks(new genericCallbacks());
|
||||
|
||||
pCharacteristic_man = pDEVService->createCharacteristic(
|
||||
MAN_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_man->setValue("Hunter Douglas");
|
||||
pCharacteristic_man->setCallbacks(new genericCallbacks());
|
||||
pCharacteristic_mod = pDEVService->createCharacteristic(
|
||||
MOD_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_mod->setValue(String(TYP_ID));
|
||||
pCharacteristic_mod->setCallbacks(new genericCallbacks());
|
||||
pCharacteristic_fwr = pDEVService->createCharacteristic(
|
||||
FWR_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_fwr->setValue(String(FW_REVISION));
|
||||
pCharacteristic_fwr->setCallbacks(new genericCallbacks());
|
||||
pCharacteristic_hwr = pDEVService->createCharacteristic(
|
||||
HWR_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_hwr->setValue(String(HW_REVISION));
|
||||
pCharacteristic_hwr->setCallbacks(new genericCallbacks());
|
||||
|
||||
|
||||
// Start the services
|
||||
pCovService->start();
|
||||
pFWService->start();
|
||||
@@ -408,9 +446,10 @@ pDesc1->setValue("cover");*/
|
||||
|
||||
// Start advertising
|
||||
BLEAdvertisementData AdvertisementData;
|
||||
const String manufacturerData = String("\x19\x08\x00\x00\x2A\x00\x00\x00\x00\x00\xA2", 11);
|
||||
// Hunter Douglas ^^--^^ ^^key^^ ^^ ID-Type
|
||||
AdvertisementData.setManufacturerData(manufacturerData);
|
||||
const char adv[] = { 0x19, 0x08, 0x00, 0x00, TYP_ID, 0x00, 0x00, 0x00, 0x00, 0x00, 0xA2 };
|
||||
// Hunter Douglas ^^ -- ^^ ^^key ^^ ^--pos1--^
|
||||
|
||||
AdvertisementData.setManufacturerData(String(adv, 11));
|
||||
AdvertisementData.setPartialServices(BLEUUID(COVER_SERVICE_UUID));
|
||||
AdvertisementData.setFlags((1 << 2) | (1 << 1)); // [BR/EDR Not Supported] | [LE General Discoverable Mode]
|
||||
|
||||
|
||||
BIN
img/1200x630wa.png
Normal file
BIN
img/1200x630wa.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 40 KiB |
BIN
img/icon.png
Normal file
BIN
img/icon.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 11 KiB |
BIN
img/icon@2x.png
Normal file
BIN
img/icon@2x.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 29 KiB |
BIN
img/icon_full.png
Normal file
BIN
img/icon_full.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 31 KiB |
BIN
img/logo.png
Normal file
BIN
img/logo.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 44 KiB |
BIN
img/logo@2x.png
Normal file
BIN
img/logo@2x.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 53 KiB |
@@ -1,7 +1,17 @@
|
||||
# pyproject.toml
|
||||
|
||||
[project]
|
||||
requires-python = ">=3.12.0"
|
||||
name = "hunterdouglas_powerview_ble"
|
||||
classifiers = [
|
||||
"License :: OSI Approved :: Apache Software License",
|
||||
"Programming Language :: Python :: 3.13"
|
||||
]
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13.2"
|
||||
|
||||
[project.urls]
|
||||
"Source Code" = "https://github.com/patman15/hdpv_ble/"
|
||||
"Bug Reports" = "https://github.com/patman15/hdpv_ble/issues"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
minversion = "8.0"
|
||||
@@ -14,9 +24,9 @@ testpaths = [
|
||||
]
|
||||
asyncio_mode = "auto"
|
||||
|
||||
# ruff configuration taken from HA 2024.11.2 (less ignores)
|
||||
# ruff settings from HA 2025.2.2
|
||||
[tool.ruff]
|
||||
required-version = ">=0.6.8"
|
||||
required-version = ">=0.9.1"
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = [
|
||||
@@ -35,8 +45,10 @@ select = [
|
||||
"B017", # pytest.raises(BaseException) should be considered evil
|
||||
"B018", # Found useless attribute access. Either assign it to a variable or remove it.
|
||||
"B023", # Function definition does not bind loop variable {name}
|
||||
"B024", # `{name}` is an abstract base class, but it has no abstract methods or properties
|
||||
"B026", # Star-arg unpacking after a keyword argument is strongly discouraged
|
||||
"B032", # Possible unintentional type annotation (using :). Did you mean to assign (using =)?
|
||||
"B035", # Dictionary comprehension uses static key
|
||||
"B904", # Use raise from to specify exception cause
|
||||
"B905", # zip() without an explicit strict= parameter
|
||||
"BLE",
|
||||
@@ -70,12 +82,27 @@ select = [
|
||||
"RSE", # flake8-raise
|
||||
"RUF005", # Consider iterable unpacking instead of concatenation
|
||||
"RUF006", # Store a reference to the return value of asyncio.create_task
|
||||
"RUF007", # Prefer itertools.pairwise() over zip() when iterating over successive pairs
|
||||
"RUF008", # Do not use mutable default values for dataclass attributes
|
||||
"RUF010", # Use explicit conversion flag
|
||||
"RUF013", # PEP 484 prohibits implicit Optional
|
||||
"RUF016", # Slice in indexed access to type {value_type} uses type {index_type} instead of an integer
|
||||
"RUF017", # Avoid quadratic list summation
|
||||
"RUF018", # Avoid assignment expressions in assert statements
|
||||
"RUF019", # Unnecessary key check before dictionary access
|
||||
# "RUF100", # Unused `noqa` directive; temporarily every now and then to clean them up
|
||||
"RUF020", # {never_like} | T is equivalent to T
|
||||
"RUF021", # Parenthesize a and b expressions when chaining and and or together, to make the precedence clear
|
||||
"RUF022", # Sort __all__
|
||||
"RUF023", # Sort __slots__
|
||||
"RUF024", # Do not pass mutable objects as values to dict.fromkeys
|
||||
"RUF026", # default_factory is a positional-only argument to defaultdict
|
||||
"RUF030", # print() call in assert statement is likely unintentional
|
||||
"RUF032", # Decimal() called with float literal argument
|
||||
"RUF033", # __post_init__ method with argument defaults
|
||||
"RUF034", # Useless if-else condition
|
||||
"RUF100", # Unused `noqa` directive
|
||||
"RUF101", # noqa directives that use redirected rule codes
|
||||
"RUF200", # Failed to parse pyproject.toml: {message}
|
||||
"S102", # Use of exec detected
|
||||
"S103", # bad-file-permissions
|
||||
"S108", # hardcoded-temp-file
|
||||
@@ -88,7 +115,7 @@ select = [
|
||||
"S317", # suspicious-xml-sax-usage
|
||||
"S318", # suspicious-xml-mini-dom-usage
|
||||
"S319", # suspicious-xml-pull-dom-usage
|
||||
"S320", # suspicious-xmle-tree-usage
|
||||
# "S320", # suspicious-xmle-tree-usage
|
||||
"S601", # paramiko-call
|
||||
"S602", # subprocess-popen-with-shell-equals-true
|
||||
"S604", # call-with-shell-equals-true
|
||||
@@ -99,7 +126,7 @@ select = [
|
||||
"SLOT", # flake8-slots
|
||||
"T100", # Trace found: {name} used
|
||||
"T20", # flake8-print
|
||||
"TCH", # flake8-type-checking
|
||||
"TC", # flake8-type-checking
|
||||
"TID", # Tidy imports
|
||||
"TRY", # tryceratops
|
||||
"UP", # pyupgrade
|
||||
@@ -119,13 +146,12 @@ ignore = [
|
||||
# "PLC1901", # {existing} can be simplified to {replacement} as an empty string is falsey; too many false positives
|
||||
# "PLR0911", # Too many return statements ({returns} > {max_returns})
|
||||
# "PLR0912", # Too many branches ({branches} > {max_branches})
|
||||
# "PLR0913", # Too many arguments to function call ({c_args} > {max_args})
|
||||
"PLR0913", # Too many arguments to function call ({c_args} > {max_args})
|
||||
# "PLR0915", # Too many statements ({statements} > {max_statements})
|
||||
"PLR2004", # Magic value used in comparison, consider replacing {value} with a constant variable
|
||||
# "PLW2901", # Outer {outer_kind} variable {name} overwritten by inner {inner_kind} target
|
||||
# "PT004", # Fixture {fixture} does not return anything, add leading underscore
|
||||
# "PT011", # pytest.raises({exception}) is too broad, set the `match` parameter or use a more specific exception
|
||||
# "PT018", # Assertion should be broken down into multiple parts
|
||||
"PT018", # Assertion should be broken down into multiple parts
|
||||
# "RUF001", # String contains ambiguous unicode character.
|
||||
# "RUF002", # Docstring contains ambiguous unicode character.
|
||||
# "RUF003", # Comment contains ambiguous unicode character.
|
||||
@@ -136,14 +162,12 @@ ignore = [
|
||||
# "SIM115", # Use context handler for opening files
|
||||
|
||||
# Moving imports into type-checking blocks can mess with pytest.patch()
|
||||
"TCH001", # Move application import {} into a type-checking block
|
||||
"TCH002", # Move third-party import {} into a type-checking block
|
||||
"TCH003", # Move standard library import {} into a type-checking block
|
||||
"TC001", # Move application import {} into a type-checking block
|
||||
"TC002", # Move third-party import {} into a type-checking block
|
||||
"TC003", # Move standard library import {} into a type-checking block
|
||||
|
||||
"TRY003", # Avoid specifying long messages outside the exception class
|
||||
"TRY400", # Use `logging.exception` instead of `logging.error`
|
||||
# Ignored due to performance: https://github.com/charliermarsh/ruff/issues/2923
|
||||
"UP038", # Use `X | Y` in `isinstance` call instead of `(X, Y)`
|
||||
|
||||
# May conflict with the formatter, https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
|
||||
"W191",
|
||||
@@ -161,3 +185,14 @@ ignore = [
|
||||
"PLE0605"
|
||||
]
|
||||
|
||||
|
||||
[tool.ruff.lint.isort]
|
||||
force-sort-within-sections = true
|
||||
known-first-party = [
|
||||
"homeassistant",
|
||||
]
|
||||
combine-as-imports = true
|
||||
split-on-trailing-comma = false
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"scripts/*" = ["T201"]
|
||||
@@ -1,4 +1,6 @@
|
||||
homeassistant==2024.11.0
|
||||
homeassistant==2025.11.0
|
||||
pip>=21.3.1
|
||||
ruff==0.6.8
|
||||
|
||||
ruff>=0.9.1,<=0.15.0
|
||||
types-requests
|
||||
mypy~=1.19.1
|
||||
codespell
|
||||
|
||||
@@ -2,9 +2,9 @@
|
||||
|
||||
wheel
|
||||
home-assistant-bluetooth
|
||||
habluetooth>=3.6.0
|
||||
habluetooth>=5.3.0
|
||||
bluetooth-adapters
|
||||
pytest>=8.3.3
|
||||
pytest>=8.4.1
|
||||
pytest-cov>=5.0.0
|
||||
pytest-socket>=0.7.0
|
||||
pytest-asyncio>=0.24.0
|
||||
@@ -16,10 +16,10 @@ aiohttp
|
||||
aiohttp_cors
|
||||
aiohttp-fast-url-dispatcher
|
||||
aiohttp-zlib-ng
|
||||
bleak>=0.22.3
|
||||
bleak-retry-connector>=3.6.0
|
||||
bleak>=1.0.1
|
||||
bleak-retry-connector>=4.4.3
|
||||
bluetooth-data-tools
|
||||
pyserial-asyncio
|
||||
pyudev
|
||||
pytest-homeassistant-custom-component==0.13.181
|
||||
pytest-homeassistant-custom-component==0.13.294
|
||||
|
||||
|
||||
1
scripts/__init__.py
Normal file
1
scripts/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Script to extract PowerView homekey from a G3 PowerView Gateway."""
|
||||
119
scripts/extract_gateway3_homekey.py
Normal file
119
scripts/extract_gateway3_homekey.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""Extract PowerView homekey from a G3 PowerView Gateway."""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import struct
|
||||
from typing import Any, Final
|
||||
|
||||
import requests
|
||||
|
||||
HUB: Final[str] = "http://powerview-g3.local"
|
||||
TIMEOUT: Final[int] = 10
|
||||
|
||||
|
||||
def create_request(sid: int, cid: int, sequence_id: int, data: bytes) -> bytes:
|
||||
"""Assemble a request frame for the PowerView protocol."""
|
||||
return struct.pack("<BBBB", sid, cid, sequence_id, len(data)) + data
|
||||
|
||||
|
||||
def decode_response(packet: bytes) -> dict[str, Any]:
|
||||
"""Decode a response frame from the PowerView protocol."""
|
||||
if len(packet) < 4:
|
||||
raise ValueError("Packet size too small")
|
||||
sid, cid, sequence_id, length = struct.unpack("<BBBB", packet[0:4])
|
||||
if len(packet) != 4 + length:
|
||||
raise ValueError("Not all data present")
|
||||
if length < 1:
|
||||
raise ValueError("No errorCode present")
|
||||
(error_code,) = struct.unpack("<B", packet[4:5])
|
||||
data: Final[bytes] = packet[5:]
|
||||
return {
|
||||
"cid": cid,
|
||||
"sid": sid,
|
||||
"sequenceId": sequence_id,
|
||||
"errorCode": error_code,
|
||||
"data": data,
|
||||
}
|
||||
|
||||
|
||||
def create_get_shade_key_request(sequence_id) -> bytes:
|
||||
"""Create a GetShadeKey request frame."""
|
||||
return create_request(251, 18, sequence_id, b"")
|
||||
|
||||
|
||||
def get_shade_key(hub: str, ble_name) -> bytes:
|
||||
"""Get the homekey for a shade."""
|
||||
try:
|
||||
shades_exec_resp: requests.Response = requests.post(
|
||||
hub + "/home/shades/exec?shades=" + ble_name,
|
||||
json={"hex": create_get_shade_key_request(1).hex()},
|
||||
timeout=TIMEOUT,
|
||||
)
|
||||
shades_exec_resp.raise_for_status()
|
||||
except requests.exceptions.RequestException as ex:
|
||||
print(f"Unable to send GetShadeKey {ex!s}")
|
||||
raise
|
||||
|
||||
result: dict = json.loads(shades_exec_resp.content)
|
||||
responses = result.get("responses", [])
|
||||
if len(responses) != 1 or "hex" not in responses[0]:
|
||||
raise OSError(f"Error when attempting GetShadeKey: {result}")
|
||||
response: Final[bytes] = bytes.fromhex(responses[0]["hex"])
|
||||
dec_resp: Final[dict[str, Any]] = decode_response(response)
|
||||
if dec_resp["errorCode"] != 0:
|
||||
raise ValueError(
|
||||
f"BLE errorCode={dec_resp['errorCode']} data={dec_resp['data'].hex()}"
|
||||
)
|
||||
if len(dec_resp["data"]) != 16:
|
||||
raise ValueError("Expected 16 byte homekey")
|
||||
return dec_resp["data"]
|
||||
|
||||
|
||||
def main(hub: str) -> int:
|
||||
"""Extract the homekeys from all shades."""
|
||||
try:
|
||||
shades_resp: requests.Response = requests.get(
|
||||
hub + "/home/shades", timeout=TIMEOUT
|
||||
)
|
||||
shades_resp.raise_for_status()
|
||||
except requests.exceptions.RequestException as ex:
|
||||
print(f"Unable to get list of shades:\n\t{ex!s}")
|
||||
return -1
|
||||
|
||||
shades = json.loads(shades_resp.content)
|
||||
print(f"Found {len(shades)} shades, interrogating")
|
||||
network_key: bytes | None = None
|
||||
for shade in shades:
|
||||
name: str = base64.b64decode(shade["name"]).decode("utf-8")
|
||||
try:
|
||||
key: bytes = get_shade_key(hub, shade["bleName"])
|
||||
network_key = key
|
||||
except (OSError, ValueError) as ex:
|
||||
if network_key is not None:
|
||||
key = network_key
|
||||
print(f"Shade '{name}':")
|
||||
print(f"\tBLE name: '{shade['bleName']}'")
|
||||
print(f"\tHomeKey: {key.hex()} (shade unreachable, using network key)")
|
||||
else:
|
||||
print(f"Shade '{name}':")
|
||||
print(f"\tBLE name: '{shade['bleName']}'")
|
||||
print(f"\tHomeKey: ERROR - {ex}")
|
||||
continue
|
||||
|
||||
print(f"Shade '{name}':")
|
||||
print(f"\tBLE name: '{shade['bleName']}'")
|
||||
print(f"\tHomeKey: {key.hex()}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Extract PowerView homekey from a G3 PowerView Gateway"
|
||||
)
|
||||
parser.add_argument("hub", nargs="?", help="URL to HUB", default=HUB)
|
||||
args = parser.parse_args()
|
||||
sys.exit(main(**vars(args)))
|
||||
Reference in New Issue
Block a user