feat: refactor to a hub model

This commit is contained in:
Richard Mann
2026-04-09 11:03:33 +10:00
parent acb2c5ff52
commit e2bb3b5592
12 changed files with 357 additions and 556 deletions

View File

@@ -1,9 +1,6 @@
"""Config flow for BLE Battery Management System integration."""
"""Config flow for Hunter Douglas PowerView BLE integration."""
import asyncio
import base64
import contextlib
from dataclasses import dataclass
import hashlib
import struct
from typing import Any
@@ -11,12 +8,8 @@ import aiohttp
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.bluetooth import (
BluetoothServiceInfoBleak,
async_discovered_service_info,
)
from homeassistant.components.bluetooth import BluetoothServiceInfoBleak
from homeassistant.config_entries import ConfigFlowResult
from homeassistant.const import CONF_ADDRESS
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.selector import (
@@ -28,32 +21,78 @@ from homeassistant.helpers.selector import (
TextSelectorType,
)
from .api import UUID_COV_SERVICE as UUID
from .const import CONF_HOME_KEY, DOMAIN, LOGGER, MFCT_ID
from .const import CONF_HOME_KEY, CONF_HUB_URL, DOMAIN, LOGGER
def _needs_encryption(manufacturer_data_hex: str) -> bool:
"""Return True if the BLE advertisement indicates encryption (home_id != 0)."""
data = bytearray.fromhex(manufacturer_data_hex)
if len(data) < 2:
return False
home_id = int.from_bytes(data[0:2], byteorder="little")
return home_id != 0
def _hub_unique_id(home_key: str) -> str:
"""Derive a stable unique ID for a hub entry from the home key."""
if home_key:
digest = hashlib.sha256(home_key.encode()).hexdigest()[:16]
return f"pvhome_{digest}"
return "pvhome_unencrypted"
@dataclass
class HubShadeInfo:
"""Shade metadata from the PowerView hub."""
def _parse_key_response(ble_name: str, result: dict) -> bytes | None: # noqa: PLR0911
"""Parse a shade exec response and return the 16-byte key, or None."""
if result.get("err"):
err_msg = (result.get("responses") or [{}])[0].get("errMsg", "unknown")
LOGGER.warning(
"Shade %s: hub BLE command failed (err=%s: %s)",
ble_name,
result["err"],
err_msg,
)
return None
name: str # Human-readable name (decoded from base64)
ble_name: str # BLE advertisement name, e.g. "DUE:94ED"
responses = result.get("responses", [])
if len(responses) != 1 or "hex" not in responses[0]:
LOGGER.warning(
"Shade %s returned unexpected response structure: %s",
ble_name,
result,
)
return None
response_bytes = bytes.fromhex(responses[0]["hex"])
if len(response_bytes) < 5:
LOGGER.warning(
"Shade %s response too short (%d bytes)", ble_name, len(response_bytes)
)
return None
_s, _c, _q, length = struct.unpack("<BBBB", response_bytes[0:4])
if len(response_bytes) != 4 + length:
LOGGER.warning(
"Shade %s frame length mismatch (header=%d, actual=%d)",
ble_name,
4 + length,
len(response_bytes),
)
return None
if response_bytes[4] != 0:
LOGGER.warning(
"Shade %s returned error status %d", ble_name, response_bytes[4]
)
return None
key_data = response_bytes[5:]
if len(key_data) != 16:
LOGGER.warning(
"Shade %s returned key of wrong length (%d, expected 16)",
ble_name,
len(key_data),
)
return None
return key_data
async def _fetch_shades_from_hub(
async def _fetch_key_from_hub(
hass: HomeAssistant, hub_url: str
) -> list[HubShadeInfo]:
"""Fetch shade list with human-readable names from a PowerView G3 hub.
) -> bytes:
"""Fetch 16-byte homekey from a PowerView G3 hub.
Tries each shade on the hub until one returns a valid key.
The key is network-wide so any reachable shade returns the same value.
Raises ValueError on protocol/key errors.
Raises aiohttp.ClientError on network errors.
Raises asyncio.TimeoutError on timeout.
"""
@@ -65,76 +104,33 @@ async def _fetch_shades_from_hub(
shades = await resp.json(content_type=None)
if not shades:
return []
hub_shades: list[HubShadeInfo] = []
for shade in shades:
ble_name = shade.get("bleName", "")
if not ble_name:
continue
name_b64 = shade.get("name", "")
try:
name = base64.b64decode(name_b64).decode("utf-8") if name_b64 else ble_name
except Exception: # noqa: BLE001
name = ble_name
hub_shades.append(HubShadeInfo(name=name, ble_name=ble_name))
return hub_shades
async def _fetch_key_and_shades_from_hub(
hass: HomeAssistant, hub_url: str
) -> tuple[bytes, list[HubShadeInfo]]:
"""Fetch 16-byte homekey and shade list from a PowerView G3 hub.
Returns (key, shade_list). The key is network-wide so any reachable shade
returns the same value. The shade list contains human-readable names that
can be used to label BLE-discovered devices.
Raises ValueError on protocol/key errors.
Raises aiohttp.ClientError on network errors.
Raises asyncio.TimeoutError on timeout.
"""
hub_shades = await _fetch_shades_from_hub(hass, hub_url)
if not hub_shades:
raise ValueError("No shades found on the hub")
session = async_get_clientsession(hass)
timeout = aiohttp.ClientTimeout(total=10)
ble_names = [s.get("bleName", "") for s in shades if s.get("bleName")]
if not ble_names:
raise ValueError("No BLE-capable shades found on the hub")
# GetShadeKey BLE request: sid=251, cid=18, seqId=1, data_len=0
request_frame = struct.pack("<BBBB", 251, 18, 1, 0)
# Try each shade until one returns a valid key (some may be out of range)
last_error: Exception = ValueError("No shades responded")
for hs in hub_shades:
for ble_name in ble_names:
try:
async with session.post(
f"{hub_url}/home/shades/exec?shades={hs.ble_name}",
f"{hub_url}/home/shades/exec?shades={ble_name}",
json={"hex": request_frame.hex()},
timeout=timeout,
) as resp:
resp.raise_for_status()
result = await resp.json(content_type=None)
except (TimeoutError, aiohttp.ClientError) as ex:
LOGGER.warning("Shade %s unreachable: %s", ble_name, ex)
last_error = ex
continue
responses = result.get("responses", [])
if len(responses) != 1 or "hex" not in responses[0]:
continue
response_bytes = bytes.fromhex(responses[0]["hex"])
if len(response_bytes) < 5:
continue
_s, _c, _q, length = struct.unpack("<BBBB", response_bytes[0:4])
if len(response_bytes) != 4 + length:
continue
if response_bytes[4] != 0:
continue
key_data = response_bytes[5:]
if len(key_data) != 16:
continue
return key_data, hub_shades
key_data = _parse_key_response(ble_name, result)
if key_data is not None:
return key_data
raise ValueError(f"No reachable shade returned a valid key: {last_error}")
@@ -159,7 +155,7 @@ _HOMEKEY_SCHEMA = vol.Schema(
]
)
),
vol.Optional("hub_url", default="http://powerview-g3.local"): TextSelector(
vol.Optional(CONF_HUB_URL, default="http://powerview-g3.local"): TextSelector(
TextSelectorConfig(type=TextSelectorType.URL)
),
vol.Optional("home_key", default=""): TextSelector(
@@ -170,46 +166,27 @@ _HOMEKEY_SCHEMA = vol.Schema(
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for BT Battery Management System."""
"""Handle a config flow for Hunter Douglas PowerView BLE."""
VERSION = 1
VERSION = 2
MINOR_VERSION = 1
@dataclass
class DiscoveredDevice:
"""A discovered bluetooth device."""
name: str
discovery_info: BluetoothServiceInfoBleak
def __init__(self) -> None:
"""Initialize the config flow."""
self._discovered_device: ConfigFlow.DiscoveredDevice | None = None
self._discovered_devices: dict[str, ConfigFlow.DiscoveredDevice] = {}
self._manufacturer_data_hex: str = ""
self._device_name: str = ""
self._home_key: str = ""
self._hub_url: str = ""
self._hub_shades: list[HubShadeInfo] = []
def _create_entry(self) -> ConfigFlowResult:
"""Create the config entry with collected data."""
data: dict[str, str] = {
"manufacturer_data": self._manufacturer_data_hex,
CONF_HOME_KEY: self._home_key,
}
"""Create the hub config entry."""
data: dict[str, str] = {CONF_HOME_KEY: self._home_key}
if self._hub_url:
data["hub_url"] = self._hub_url
return self.async_create_entry(
title=self._device_name,
data=data,
)
data[CONF_HUB_URL] = self._hub_url
return self.async_create_entry(title="PowerView Home", data=data)
def _validate_manual_key(
self, user_input: dict[str, Any], errors: dict[str, str]
) -> bool:
"""Validate a manually entered hex key and store it.
"""Validate a manually entered hex key.
Returns True on success, False on validation error.
"""
@@ -233,7 +210,6 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Parse and validate homekey user_input, populating self state.
Returns True on success, False on validation error (errors dict is populated).
On skip, self._home_key is set to "".
"""
method = user_input.get("key_method", "skip")
@@ -247,7 +223,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
if method != "hub":
return False
hub_url = user_input.get("hub_url", "").rstrip("/")
hub_url = user_input.get(CONF_HUB_URL, "").rstrip("/")
_HUB_ERROR_MAP: dict[type[Exception], str] = {
aiohttp.ClientResponseError: "hub_http_error",
aiohttp.ClientConnectionError: "hub_connection_error",
@@ -255,14 +231,14 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
ValueError: "hub_protocol_error",
}
try:
key, hub_shades = await _fetch_key_and_shades_from_hub(self.hass, hub_url)
key = await _fetch_key_from_hub(self.hass, hub_url)
except tuple(_HUB_ERROR_MAP) as ex:
errors["hub_url"] = _HUB_ERROR_MAP[type(ex)]
LOGGER.warning("Hub key fetch failed: %s", ex)
errors[CONF_HUB_URL] = _HUB_ERROR_MAP[type(ex)]
return False
self._home_key = key.hex()
self._hub_url = hub_url
self._hub_shades = hub_shades
return True
async def async_step_bluetooth(
@@ -271,296 +247,45 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a flow initialized by Bluetooth discovery."""
LOGGER.debug("Bluetooth device detected: %s", discovery_info)
# Tag the flow with this address so HA deduplicates future
# discovery flows for the same device
await self.async_set_unique_id(discovery_info.address)
self._abort_if_unique_id_configured()
self._discovered_device = ConfigFlow.DiscoveredDevice(
discovery_info.name, discovery_info
)
self.context["title_placeholders"] = {"name": self._discovered_device.name}
return await self.async_step_bluetooth_confirm()
async def async_step_bluetooth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm bluetooth device discovery."""
assert self._discovered_device is not None
LOGGER.debug("confirm step for %s", self._discovered_device.name)
if user_input is not None:
self._manufacturer_data_hex = (
self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()
)
self._device_name = self._discovered_device.name
# Unencrypted shades can skip the homekey step entirely
if not _needs_encryption(self._manufacturer_data_hex):
await self._resolve_friendly_name()
return self._create_entry()
return await self.async_step_homekey_bluetooth()
self._set_confirm_only()
return self.async_show_form(
step_id="bluetooth_confirm",
description_placeholders={"name": self._discovered_device.name},
)
async def async_step_homekey_bluetooth(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Configure homekey for a shade discovered via Bluetooth."""
# Reuse an existing key if another shade was already configured
existing = self._existing_home_key()
if existing and user_input is None:
self._home_key = existing
await self._resolve_friendly_name()
return self._create_entry()
errors: dict[str, str] = {}
if user_input is not None and await self._validate_homekey_input(
user_input, errors
):
# Use hub name for the entry title if available
friendly = self._hub_name_for(self._device_name)
if friendly:
self._device_name = friendly
return self._create_entry()
return self.async_show_form(
step_id="homekey_bluetooth",
data_schema=_HOMEKEY_SCHEMA,
errors=errors,
description_placeholders={
"name": self._device_name,
"hub_url_example": "http://powerview-g3.local",
},
)
def _existing_entry_value(self, key: str) -> str:
"""Return the first non-empty value for *key* across configured entries."""
# If a hub entry already exists, shades are auto-discovered
for entry in self._async_current_entries():
if value := entry.data.get(key, ""):
return value
return ""
if entry.version >= 2:
return self.async_abort(reason="already_configured")
def _existing_home_key(self) -> str:
"""Return the home_key from any already-configured entry, or ''."""
return self._existing_entry_value(CONF_HOME_KEY)
async def _resolve_friendly_name(self) -> None:
"""Try to resolve BLE device name to hub friendly name."""
hub_url = self._hub_url or self._existing_entry_value("hub_url")
if not hub_url:
return
try:
shades = await _fetch_shades_from_hub(self.hass, hub_url)
for hs in shades:
if hs.ble_name == self._device_name:
self._device_name = hs.name
break
if not self._hub_url:
self._hub_url = hub_url
except (TimeoutError, aiohttp.ClientError, ValueError):
pass
def _hub_name_for(self, ble_name: str) -> str | None:
"""Return the human-readable hub name for a BLE name, or None."""
for hs in self._hub_shades:
if hs.ble_name == ble_name:
return hs.name
return None
# No hub entry yet — redirect to user setup
return await self.async_step_user()
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the user step — reuse existing key or offer a menu."""
"""Handle the user step — create a hub entry."""
LOGGER.debug("user step")
existing = self._existing_home_key()
if existing:
self._home_key = existing
self._hub_url = self._hub_url or self._existing_entry_value("hub_url")
if self._hub_url and not self._hub_shades:
with contextlib.suppress(
TimeoutError, aiohttp.ClientError, ValueError
):
self._hub_shades = await _fetch_shades_from_hub(
self.hass, self._hub_url
)
return self.async_show_menu(
step_id="user",
menu_options=["select_device", "manual"],
)
return await self.async_step_homekey()
def _build_selected_entries(
self, user_input: dict[str, Any]
) -> list[dict[str, Any]]:
"""Build config entry data for each selected shade address."""
addresses: list[str] = user_input[CONF_ADDRESS]
if isinstance(addresses, str):
addresses = [addresses]
# Only one hub entry allowed (per key, but for simplicity one total)
for entry in self._async_current_entries():
if entry.version >= 2:
return self.async_abort(reason="single_instance_allowed")
entries: list[dict[str, Any]] = []
for address in addresses:
device = self._discovered_devices[address]
ble_name = device.name
name = self._hub_name_for(ble_name) or ble_name
mfct_hex = device.discovery_info.manufacturer_data[MFCT_ID].hex()
entry_data: dict[str, str] = {
"manufacturer_data": mfct_hex,
CONF_HOME_KEY: self._home_key,
}
if self._hub_url:
entry_data["hub_url"] = self._hub_url
entries.append(
{
"address": address,
"name": name,
"data": entry_data,
}
)
return entries
async def async_step_select_device(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Select one or more BLE-discovered shades, or fall through to manual."""
LOGGER.debug("select_device step")
if user_input is not None:
entries = self._build_selected_entries(user_input)
# Kick off auto-add flows for all but the last shade
await asyncio.gather(
*(
self.hass.config_entries.flow.async_init(
DOMAIN,
context={"source": "auto_add"},
data=info,
)
for info in entries[:-1]
)
)
# Create the final entry normally (ends this flow)
last = entries[-1]
await self.async_set_unique_id(last["address"], raise_on_progress=False)
self._abort_if_unique_id_configured()
self._device_name = last["name"]
self._manufacturer_data_hex = last["data"]["manufacturer_data"]
self.context["title_placeholders"] = {"name": self._device_name}
return self._create_entry()
current_addresses = self._async_current_ids()
for discovery_info in async_discovered_service_info(self.hass, False):
address = discovery_info.address
if address in current_addresses or address in self._discovered_devices:
continue
if MFCT_ID not in discovery_info.manufacturer_data:
continue
if UUID not in discovery_info.service_uuids:
continue
self._discovered_devices[address] = ConfigFlow.DiscoveredDevice(
discovery_info.name, discovery_info
)
if not self._discovered_devices:
return await self.async_step_manual()
titles: list[SelectOptionDict] = []
for address, discovery in self._discovered_devices.items():
hub_name = self._hub_name_for(discovery.name)
label = f"{hub_name} ({discovery.name})" if hub_name else discovery.name
titles.append({"value": address, "label": label})
return self.async_show_form(
step_id="select_device",
data_schema=vol.Schema(
{
vol.Required(CONF_ADDRESS): SelectSelector(
SelectSelectorConfig(options=titles, multiple=True)
)
}
),
)
async def async_step_auto_add(
self, discovery_info: dict[str, Any]
) -> ConfigFlowResult:
"""Handle a shade queued from multi-select for individual setup."""
await self.async_set_unique_id(discovery_info["address"])
self._abort_if_unique_id_configured()
self._device_name = discovery_info["name"]
self._manufacturer_data_hex = discovery_info["data"]["manufacturer_data"]
self._home_key = discovery_info["data"].get(CONF_HOME_KEY, "")
self._hub_url = discovery_info["data"].get("hub_url", "")
self.context["title_placeholders"] = {"name": self._device_name}
return await self.async_step_auto_add_confirm()
async def async_step_auto_add_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm adding a shade discovered via multi-select."""
if user_input is not None:
return self._create_entry()
self._set_confirm_only()
return self.async_show_form(
step_id="auto_add_confirm",
description_placeholders={"name": self._device_name},
)
async def async_step_manual(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle manual entry of a BLE device address and name."""
if user_input is not None:
address = user_input[CONF_ADDRESS].upper().strip()
self._device_name = user_input["ble_name"].strip()
await self.async_set_unique_id(address, raise_on_progress=False)
self._abort_if_unique_id_configured()
self.context["title_placeholders"] = {"name": self._device_name}
self._manufacturer_data_hex = ""
return self._create_entry()
return self.async_show_form(
step_id="manual",
data_schema=vol.Schema(
{
vol.Required(CONF_ADDRESS): TextSelector(
TextSelectorConfig(type=TextSelectorType.TEXT)
),
vol.Required("ble_name"): TextSelector(
TextSelectorConfig(type=TextSelectorType.TEXT)
),
}
),
)
async def async_step_homekey(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Configure homekey — collected before device selection."""
errors: dict[str, str] = {}
if user_input is not None and await self._validate_homekey_input(
user_input, errors
):
return await self.async_step_select_device()
unique_id = _hub_unique_id(self._home_key)
await self.async_set_unique_id(unique_id, raise_on_progress=False)
self._abort_if_unique_id_configured()
return self._create_entry()
return self.async_show_form(
step_id="homekey",
step_id="user",
data_schema=_HOMEKEY_SCHEMA,
errors=errors,
description_placeholders={
"hub_url_example": "http://powerview-g3.local",
},
)