"""Config flow for BLE Battery Management System integration.""" import asyncio import base64 import struct from dataclasses import dataclass from typing import Any import aiohttp import voluptuous as vol from homeassistant import config_entries from homeassistant.components.bluetooth import ( BluetoothServiceInfoBleak, async_discovered_service_info, ) from homeassistant.config_entries import ConfigFlowResult from homeassistant.const import CONF_ADDRESS from homeassistant.core import HomeAssistant from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.selector import ( SelectOptionDict, SelectSelector, SelectSelectorConfig, TextSelector, TextSelectorConfig, TextSelectorType, ) from .api import UUID_COV_SERVICE as UUID from .const import CONF_HOME_KEY, DOMAIN, LOGGER, MFCT_ID def _needs_encryption(manufacturer_data_hex: str) -> bool: """Return True if the BLE advertisement indicates encryption (home_id != 0).""" data = bytearray.fromhex(manufacturer_data_hex) if len(data) < 2: return False home_id = int.from_bytes(data[0:2], byteorder="little") return home_id != 0 @dataclass class HubShadeInfo: """Shade metadata from the PowerView hub.""" name: str # Human-readable name (decoded from base64) ble_name: str # BLE advertisement name, e.g. "DUE:94ED" async def _fetch_key_and_shades_from_hub( hass: HomeAssistant, hub_url: str ) -> tuple[bytes, list[HubShadeInfo]]: """Fetch 16-byte homekey and shade list from a PowerView G3 hub. Returns (key, shade_list). The key is network-wide so any reachable shade returns the same value. The shade list contains human-readable names that can be used to label BLE-discovered devices. Raises ValueError on protocol/key errors. Raises aiohttp.ClientError on network errors. Raises asyncio.TimeoutError on timeout. """ session = async_get_clientsession(hass) timeout = aiohttp.ClientTimeout(total=10) # Get list of shades from hub async with session.get(f"{hub_url}/home/shades", timeout=timeout) as resp: resp.raise_for_status() shades = await resp.json(content_type=None) if not shades: raise ValueError("No shades found on the hub") # Parse shade metadata (name is base64-encoded on the hub) hub_shades: list[HubShadeInfo] = [] for shade in shades: ble_name = shade.get("bleName", "") if not ble_name: continue name_b64 = shade.get("name", "") try: name = base64.b64decode(name_b64).decode("utf-8") if name_b64 else ble_name except Exception: # noqa: BLE001 name = ble_name hub_shades.append(HubShadeInfo(name=name, ble_name=ble_name)) # GetShadeKey BLE request: sid=251, cid=18, seqId=1, data_len=0 request_frame = struct.pack(" None: """Initialize the config flow.""" self._discovered_device: ConfigFlow.DiscoveredDevice | None = None self._discovered_devices: dict[str, ConfigFlow.DiscoveredDevice] = {} self._manufacturer_data_hex: str = "" self._device_name: str = "" self._home_key: str = "" self._hub_shades: list[HubShadeInfo] = [] def _create_entry(self) -> ConfigFlowResult: """Create the config entry with collected data.""" return self.async_create_entry( title=self._device_name, data={ "manufacturer_data": self._manufacturer_data_hex, CONF_HOME_KEY: self._home_key, }, ) async def async_step_bluetooth( self, discovery_info: BluetoothServiceInfoBleak ) -> ConfigFlowResult: """Handle a flow initialized by Bluetooth discovery.""" LOGGER.debug("Bluetooth device detected: %s", discovery_info) await self.async_set_unique_id(discovery_info.address) self._abort_if_unique_id_configured() self._discovered_device = ConfigFlow.DiscoveredDevice( discovery_info.name, discovery_info ) self.context["title_placeholders"] = {"name": self._discovered_device.name} return await self.async_step_bluetooth_confirm() async def async_step_bluetooth_confirm( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Confirm bluetooth device discovery.""" assert self._discovered_device is not None LOGGER.debug("confirm step for %s", self._discovered_device.name) if user_input is not None: self._manufacturer_data_hex = ( self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex() ) self._device_name = self._discovered_device.name # Unencrypted shades can skip the homekey step entirely if not _needs_encryption(self._manufacturer_data_hex): return self._create_entry() return await self.async_step_homekey_bluetooth() self._set_confirm_only() return self.async_show_form( step_id="bluetooth_confirm", description_placeholders={"name": self._discovered_device.name}, ) async def async_step_homekey_bluetooth( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Configure homekey for a shade discovered via Bluetooth.""" # Reuse an existing key if another shade was already configured existing = self._existing_home_key() if existing and user_input is None: self._home_key = existing return self._create_entry() errors: dict[str, str] = {} if user_input is not None: method = user_input.get("key_method", "skip") if method == "skip": self._home_key = "" return self._create_entry() elif method == "manual": raw = user_input.get("home_key", "").strip() if "\\x" in raw: raw = raw.replace("\\x", "") if len(raw) != 32: errors["home_key"] = "invalid_key_length" else: try: bytes.fromhex(raw) except ValueError: errors["home_key"] = "invalid_key_format" else: self._home_key = raw.lower() return self._create_entry() elif method == "hub": hub_url = user_input.get("hub_url", "").rstrip("/") try: key, hub_shades = await _fetch_key_and_shades_from_hub( self.hass, hub_url ) self._home_key = key.hex() # Use hub name for the entry title if available for hs in hub_shades: if hs.ble_name == self._device_name: self._device_name = hs.name break return self._create_entry() except aiohttp.ClientResponseError: errors["hub_url"] = "hub_http_error" except aiohttp.ClientConnectionError: errors["hub_url"] = "hub_connection_error" except (asyncio.TimeoutError, TimeoutError): errors["hub_url"] = "hub_timeout" except ValueError: errors["hub_url"] = "hub_protocol_error" return self.async_show_form( step_id="homekey_bluetooth", data_schema=vol.Schema( { vol.Required("key_method", default="hub"): SelectSelector( SelectSelectorConfig( options=[ { "value": "hub", "label": "Fetch automatically from PowerView hub", }, { "value": "manual", "label": "Enter key manually (32 hex characters)", }, { "value": "skip", "label": "Skip (no key — controls disabled for encrypted shades)", }, ] ) ), vol.Optional("hub_url", default="http://powerview-g3.local"): TextSelector( TextSelectorConfig(type=TextSelectorType.URL) ), vol.Optional("home_key", default=""): TextSelector( TextSelectorConfig(type=TextSelectorType.TEXT) ), } ), errors=errors, description_placeholders={"name": self._device_name}, ) def _existing_home_key(self) -> str: """Return the home_key from any already-configured entry, or ''.""" for entry in self._async_current_entries(): key = entry.data.get(CONF_HOME_KEY, "") if key: return key return "" def _hub_name_for(self, ble_name: str) -> str | None: """Return the human-readable hub name for a BLE name, or None.""" for hs in self._hub_shades: if hs.ble_name == ble_name: return hs.name return None async def async_step_user( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle the user step — reuse existing key or collect one.""" LOGGER.debug("user step") existing = self._existing_home_key() if existing: self._home_key = existing return await self.async_step_select_device() return await self.async_step_homekey() async def async_step_select_device( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Select one or more BLE-discovered shades, or fall through to manual.""" LOGGER.debug("select_device step") if user_input is not None: addresses: list[str] = user_input[CONF_ADDRESS] if isinstance(addresses, str): addresses = [addresses] # Build entry info for every selected shade entries: list[dict[str, Any]] = [] for address in addresses: device = self._discovered_devices[address] ble_name = device.name name = self._hub_name_for(ble_name) or ble_name mfct_hex = device.discovery_info.manufacturer_data[MFCT_ID].hex() entries.append( { "address": address, "name": name, "data": { "manufacturer_data": mfct_hex, CONF_HOME_KEY: self._home_key, }, } ) # Kick off auto-add flows for all but the last shade for info in entries[:-1]: await self.hass.config_entries.flow.async_init( DOMAIN, context={"source": "auto_add"}, data=info, ) # Create the final entry normally (ends this flow) last = entries[-1] await self.async_set_unique_id(last["address"], raise_on_progress=False) self._abort_if_unique_id_configured() self._device_name = last["name"] self._manufacturer_data_hex = last["data"]["manufacturer_data"] self.context["title_placeholders"] = {"name": self._device_name} return self._create_entry() current_addresses = self._async_current_ids() for discovery_info in async_discovered_service_info(self.hass, False): address = discovery_info.address if address in current_addresses or address in self._discovered_devices: continue if MFCT_ID not in discovery_info.manufacturer_data: continue if UUID not in discovery_info.service_uuids: continue self._discovered_devices[address] = ConfigFlow.DiscoveredDevice( discovery_info.name, discovery_info ) if not self._discovered_devices: return await self.async_step_manual() titles: list[SelectOptionDict] = [] for address, discovery in self._discovered_devices.items(): hub_name = self._hub_name_for(discovery.name) label = f"{hub_name} ({discovery.name})" if hub_name else discovery.name titles.append({"value": address, "label": label}) return self.async_show_form( step_id="select_device", data_schema=vol.Schema( { vol.Required(CONF_ADDRESS): SelectSelector( SelectSelectorConfig(options=titles, multiple=True) ) } ), ) async def async_step_auto_add( self, data: dict[str, Any] ) -> ConfigFlowResult: """Create a config entry for a shade selected via multi-select.""" await self.async_set_unique_id(data["address"]) self._abort_if_unique_id_configured() return self.async_create_entry(title=data["name"], data=data["data"]) async def async_step_manual( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle manual entry of a BLE device address and name.""" if user_input is not None: address = user_input[CONF_ADDRESS].upper().strip() self._device_name = user_input["ble_name"].strip() await self.async_set_unique_id(address, raise_on_progress=False) self._abort_if_unique_id_configured() self.context["title_placeholders"] = {"name": self._device_name} self._manufacturer_data_hex = "" return self._create_entry() return self.async_show_form( step_id="manual", data_schema=vol.Schema( { vol.Required(CONF_ADDRESS): TextSelector( TextSelectorConfig(type=TextSelectorType.TEXT) ), vol.Required("ble_name"): TextSelector( TextSelectorConfig(type=TextSelectorType.TEXT) ), } ), ) async def async_step_homekey( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Configure homekey — collected before device selection.""" errors: dict[str, str] = {} if user_input is not None: method = user_input.get("key_method", "skip") if method == "skip": self._home_key = "" return await self.async_step_select_device() elif method == "manual": raw = user_input.get("home_key", "").strip() # Accept \xNN\xNN... format (e.g. from ESP32 emulator serial log) if "\\x" in raw: raw = raw.replace("\\x", "") if len(raw) != 32: errors["home_key"] = "invalid_key_length" else: try: bytes.fromhex(raw) except ValueError: errors["home_key"] = "invalid_key_format" else: self._home_key = raw.lower() return await self.async_step_select_device() elif method == "hub": hub_url = user_input.get("hub_url", "").rstrip("/") try: key, hub_shades = await _fetch_key_and_shades_from_hub( self.hass, hub_url ) self._home_key = key.hex() self._hub_shades = hub_shades return await self.async_step_select_device() except aiohttp.ClientResponseError: errors["hub_url"] = "hub_http_error" except aiohttp.ClientConnectionError: errors["hub_url"] = "hub_connection_error" except (asyncio.TimeoutError, TimeoutError): errors["hub_url"] = "hub_timeout" except ValueError: errors["hub_url"] = "hub_protocol_error" return self.async_show_form( step_id="homekey", data_schema=vol.Schema( { vol.Required("key_method", default="hub"): SelectSelector( SelectSelectorConfig( options=[ { "value": "hub", "label": "Fetch automatically from PowerView hub", }, { "value": "manual", "label": "Enter key manually (32 hex characters)", }, { "value": "skip", "label": "Skip (no key — controls disabled for encrypted shades)", }, ] ) ), vol.Optional("hub_url", default="http://powerview-g3.local"): TextSelector( TextSelectorConfig(type=TextSelectorType.URL) ), vol.Optional("home_key", default=""): TextSelector( TextSelectorConfig(type=TextSelectorType.TEXT) ), } ), errors=errors, )