11 Commits

Author SHA1 Message Date
patman15
49bb1e79c9 add tilt functions 2025-01-07 09:50:41 +01:00
patman15
d73bb1d1c4 completed tilt control 2025-01-06 19:46:52 +01:00
patman15
e00e04159a support setting tilt 2025-01-05 19:31:26 +01:00
patman15
ea33e08da0 extend set_position() for all values 2025-01-05 12:12:16 +01:00
patman15
8c5c3d3c9a Update .gitignore 2024-12-29 13:00:15 +01:00
patman15
0299a4ed24 Merge branch 'main' into feature/type_8 2024-12-29 12:49:44 +01:00
patman15
21d31ba38a Merge branch 'main' into feature/type_8 2024-12-22 13:36:41 +01:00
patman15
b6b7e43b02 stronger typing 2024-12-22 13:35:29 +01:00
patman15
b550f98e2b Merge branch 'main' into feature/type_8 2024-12-21 09:10:50 +01:00
patman15
f6460280db Merge branch 'main' into feature/type_8 2024-12-20 21:16:37 +01:00
patman15
65b7b3814b disabled home_id filter 2024-12-20 20:41:01 +01:00
25 changed files with 127 additions and 456 deletions

View File

@@ -8,8 +8,8 @@ on:
- cron: '0 5 * * 6'
jobs:
lint:
name: "Lint the code"
ruff:
name: "Ruff"
runs-on: "ubuntu-latest"
steps:
- name: "Checkout the repository"
@@ -18,17 +18,11 @@ jobs:
- name: "Set up Python"
uses: actions/setup-python@main
with:
python-version: "3.13.2"
python-version: "3.12"
cache: "pip"
- name: "Install requirements"
run: python3 -m pip install -r requirements.txt
- name: "Run ruff"
run: ruff check .
- name: "Run mypy"
run: mypy .
- name: "Run codespell"
run: codespell -L hass
- name: "Run Ruff"
run: python3 -m ruff check .

3
.gitignore vendored
View File

@@ -152,3 +152,6 @@ cython_debug/
#.idea/
aes.py
*.bak
emu/PV_BLE_cover/PV_BLE_cover.ino
img/
*.zip

View File

@@ -8,7 +8,8 @@
> [!WARNING]
> - This integration is under development!
> - Test coverage is low, malfunction might occur.
> - The HOME_KEY is lost over updates!
> - Only devices that are **not** added to the app are controlable. It is possible to add them to the app if you just want to monitor the status (position, battery) in Home Assistant.
> - Currently only position change is supported (e.g., no tilt)
## Features
- Zero configuration
@@ -17,7 +18,7 @@
### Supported Devices
Type* | Description
-- | --
-- | --
1 | Designer Roller
4 | Roman
5 | Bottom Up
@@ -25,7 +26,6 @@ Type* | Description
10 | Duette and Applause SkyLift
19 | Provenance Woven Wood
31, 32, 84 | Vignette
39 | Parkland
42 | M25T Roller Blind
49 | AC Roller
52 | Banded Shades
@@ -39,14 +39,10 @@ The integration provides the following information about the battery
Platform | Description | Unit | Details
-- | -- | -- | --
`binary_sensor` | battery charging indicator | `bool` | true if battery is charging
`button` | identify shade | - | identify shade by LED and 3 beeps
`cover` | view/control position | `%` | percentage cover is open (100% is open)
`sensor` | SoC (state of charge) | `%` | range 100% (full), 50%, 20%, 0% (battery empty)
## Installation
> [!IMPORTANT]
> In case you added your shades to the app or a gateway, you need to [set the encryption key](#set-the-encryption-key) manually in the [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py) file after **each** update!
### Automatic
Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom repository](https://hacs.xyz/docs/faq/custom_repositories/).
@@ -61,22 +57,12 @@ Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom rep
1. Restart Home Assistant
1. In the HA UI go to "Configuration" -> "Integrations" click "+" and search for "Hunter Douglas PowerView (BLE)"
## Set the Encryption Key
Currently, there are three methods to obtain the key:
1. Via adopting a BLE shade: There is a [shade emulator](/emu/PV_BLE_cover) that works with Arduino IDE and an ESP32 device (≥ 2MiB flash, ≥ 128KiB required), e.g. [Adafruit QT Py ESP32-S3](https://www.adafruit.com/product/5426). Install and connect via serial port, then go to the PowerView app and add the shade `myPVcover` to your home. You will see a log message `set shade key: \xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx` . Copy this key. You can delete the shade from the app when done.
2. Extracting from gateway: This [script](scripts/extract_gateway3_homekey.py) is able to extract the key from a working PowerView gateway.
3. Grabbing from the app: Checkout this [post in the Home Assistant community forum](https://community.home-assistant.io/t/hunter-douglas-powerview-gen-3-integration/424836/228).
Finally, you need to manually copy the key to [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py).
> [!IMPORTANT]
> You need to update the file after **each** update!
## Known Issues
<details><summary>Shade inoperable after charging</summary>
It seems that the shades require some re-initialization after charging. The solution is currently unknown, but as a workaround you can operate the shade ones using the vendor app.
</details>
## Outlook
- Add support for encryption
- Allow parallel usage to PowerView app as "remote"
- Add support for tilt function
- Add support for further device types
## Troubleshooting
In case you have severe troubles,
@@ -86,15 +72,6 @@ In case you have severe troubles,
- disable the log (Home Assistant will prompt you to download the log), and finally
- [open an issue](https://github.com/patman15/hdpv_ble/issues/new?assignees=&labels=Bug&projects=&template=bug.yml) with a good description of what happened and attach the log.
# Thanks To
[@mannkind](https://github.com/mannkind)
[license-shield]: https://img.shields.io/github/license/patman15/hdpv_ble.svg?style=for-the-badge
[releases-shield]: https://img.shields.io/github/release/patman15/hdpv_ble.svg?style=for-the-badge
[releases]: https://github.com//patman15/hdpv_ble/releases
## Outlook
- Add tests!
- Allow parallel usage to PowerView app as "remote"
- Add support for tilt function
- Add support for further device types

View File

@@ -4,24 +4,17 @@
@license: Apache-2.0 license
"""
from bleak.backends.device import BLEDevice
from bleak.exc import BleakError
from homeassistant.components.bluetooth import async_ble_device_from_address
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
from .const import LOGGER
from .const import DOMAIN, LOGGER
from .coordinator import PVCoordinator
PLATFORMS: list[Platform] = [
Platform.BINARY_SENSOR,
Platform.COVER,
Platform.SENSOR,
Platform.BUTTON,
]
PLATFORMS: list[Platform] = [Platform.BINARY_SENSOR, Platform.COVER, Platform.SENSOR]
type ConfigEntryType = ConfigEntry[PVCoordinator]
@@ -33,7 +26,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntryType) -> bool
if entry.unique_id is None:
raise ConfigEntryError("Missing unique ID for device.")
ble_device: BLEDevice | None = async_ble_device_from_address(
ble_device = async_ble_device_from_address(
hass=hass, address=entry.unique_id, connectable=True
)
@@ -48,6 +41,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntryType) -> bool
except BleakError as err:
raise ConfigEntryNotReady("Unable to query device info.") from err
# Insert the coordinator in the global registry
hass.data.setdefault(DOMAIN, {})
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(coordinator.async_start())

View File

@@ -1,9 +1,9 @@
"""Hunter Douglas PowerView BLE API."""
import asyncio
import time
from dataclasses import dataclass
from enum import Enum
import time
from typing import Final
from bleak import BleakClient
@@ -16,7 +16,6 @@ from cryptography.hazmat.primitives.ciphers.base import (
AEADDecryptionContext,
AEADEncryptionContext,
)
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
ATTR_CURRENT_TILT_POSITION,
@@ -51,7 +50,6 @@ SHADE_TYPE: Final[dict[int, str]] = {
8: "Duette, Top Down Bottom Up",
9: "Duette DuoLite, Top Down Bottom Up",
33: "Duette Architella, Top Down Bottom Up",
39: "Parkland",
47: "Pleated, Top Down Bottom Up",
# top down, tilt anywhere
51: "Venetian, Tilt Anywhere",
@@ -105,18 +103,18 @@ class PowerViewBLE:
disconnected_callback=self._on_disconnect,
services=[
UUID_COV_SERVICE,
UUID_DEV_SERVICE,
# self.UUID_DEV_SERVICE,
# self.UUID_BAT_SERVICE,
],
)
self._data_event = asyncio.Event()
self._data: bytes = b""
self._data: bytearray = bytearray()
self._info: PVDeviceInfo = PVDeviceInfo()
self._is_encrypted: bool = False
self._cmd_lock: Final = asyncio.Lock()
self._cmd_next: tuple[ShadeCmd, bytes]
self._is_encrypted: bool = False
self._cipher: Final[Cipher | None] = (
Cipher(algorithms.AES(home_key), modes.CTR(bytes(16)))
Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16)))
if len(home_key) == 16
else None
)
@@ -195,7 +193,7 @@ class PowerViewBLE:
("position3", int(data[6])),
(ATTR_CURRENT_TILT_POSITION, int(data[7])),
("home_id", int.from_bytes(data[0:2], byteorder="little")),
("type_id", int(data[2])),
("type_id", int.from_bytes(data[2:3])),
("is_opening", bool(pos & 0x3 == 0x2)),
("is_closing", bool(pos & 0x3 == 0x1)),
("battery_charging", bool(pos & 0x3 == 0x3)), # observed
@@ -208,29 +206,27 @@ class PowerViewBLE:
async def set_position(
self,
pos1: int,
pos2: int = 0x8000,
pos3: int = 0x8000,
tilt: int = 0x8000,
pos2: int | None = None,
pos3: int | None = None,
tilt: int | None = None,
velocity: int = 0x0,
disconnect: bool = True,
) -> None:
"""Set position of device."""
LOGGER.debug(
"%s setting position to %i/%i/%i, tilt %i, velocity %s",
self.name,
pos1,
pos2,
pos3,
tilt,
velocity,
)
LOGGER.debug("%s setting position to %i, tilt %i", self.name, pos1, tilt)
await self._cmd(
(
ShadeCmd.SET_POSITION,
int.to_bytes(pos1*100, 2, byteorder="little")
+ int.to_bytes(pos2, 2, byteorder="little")
+ int.to_bytes(pos3, 2, byteorder="little")
+ int.to_bytes(tilt, 2, byteorder="little")
int.to_bytes(pos1, 2, byteorder="little")
+ int.to_bytes(
pos2 if pos2 is not None else 0x8000, 2, byteorder="little"
)
+ int.to_bytes(
pos3 if pos3 is not None else 0x8000, 2, byteorder="little"
)
+ int.to_bytes(
tilt if tilt is not None else 0x8000, 2, byteorder="little"
)
+ int.to_bytes(velocity, 1),
),
disconnect,
@@ -264,15 +260,14 @@ class PowerViewBLE:
),
)
async def identify(self, beeps: int = 0x3) -> None:
"""Identify device."""
LOGGER.debug("%s identify (%i)", self.name, beeps)
await self._cmd((ShadeCmd.IDENTIFY, bytes([min(beeps, 0xFF)])))
def _verify_response(self, data: bytes, seq_nr: int, cmd: ShadeCmd) -> bool:
def _verify_response(self, din: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
"""Verify shade response data."""
data: bytearray = din
if self._cipher is not None and self._is_encrypted:
dec: AEADDecryptionContext = self._cipher.decryptor()
data = bytearray(dec.update(din) + dec.finalize())
if len(data) < 4:
LOGGER.error("Response message too short")
LOGGER.error("Reponse message too short")
return False
if int.from_bytes(data[0:2], byteorder="little") != cmd.value & 0xFFEF:
LOGGER.warning("Response to wrong command")
@@ -328,10 +323,10 @@ class PowerViewBLE:
def _notification_handler(self, _sender, data: bytearray) -> None:
LOGGER.debug("%s received BLE data: %s", self.name, data.hex(" "))
self._data = bytes(data)
self._data = data
if self._cipher is not None and self._is_encrypted:
dec: AEADDecryptionContext = self._cipher.decryptor()
self._data = bytes(dec.update(bytes(data)) + dec.finalize())
self._data = bytearray(dec.update(data) + dec.finalize())
LOGGER.debug(
"%s %s",
"decoded data: ".rjust(19 + len(self.name)),

View File

@@ -49,7 +49,7 @@ class PVBinarySensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySen
descr: BinarySensorEntityDescription,
unique_id: str,
) -> None:
"""Initialize PV binary sensor."""
"""Intialize PV binary sensor."""
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
self._attr_device_info = coord.device_info
self._attr_has_entity_name = True

View File

@@ -1,71 +0,0 @@
"""Hunter Douglas Powerview cover."""
from typing import Final
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
from homeassistant.components.button import (
ButtonDeviceClass,
ButtonEntity,
ButtonEntityDescription,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceInfo, format_mac
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .const import DOMAIN, LOGGER
from .coordinator import PVCoordinator
BUTTONS_SHADE: Final = [
ButtonEntityDescription(
key="identify",
device_class=ButtonDeviceClass.IDENTIFY,
entity_category=EntityCategory.DIAGNOSTIC,
),
]
async def async_setup_entry(
_hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the demo cover platform."""
coordinator: PVCoordinator = config_entry.runtime_data
for descr in BUTTONS_SHADE:
async_add_entities([PowerViewButton(coordinator, descr)])
class PowerViewButton(PassiveBluetoothCoordinatorEntity[PVCoordinator], ButtonEntity): # type: ignore[reportIncompatibleVariableOverride]
"""Representation of a powerview shade."""
_attr_has_entity_name = True
_attr_device_class = ButtonDeviceClass.IDENTIFY
def __init__(
self,
coordinator: PVCoordinator,
description: ButtonEntityDescription,
) -> None:
"""Initialize the shade."""
self.entity_description = description
self._coord: PVCoordinator = coordinator
self._attr_device_info = self._coord.device_info
self._attr_unique_id = (
f"{DOMAIN}_{format_mac(self._coord.address)}_{ButtonDeviceClass.IDENTIFY}"
)
super().__init__(coordinator)
@property
def device_info(self) -> DeviceInfo: # type: ignore[reportIncompatibleVariableOverride]
"""Return the device_info of the device."""
return self._coord.device_info
async def async_press(self) -> None:
"""Handle the button press."""
LOGGER.debug("identify cover")
await self._coord.api.identify()

View File

@@ -4,7 +4,6 @@ from dataclasses import dataclass
from typing import Any
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.bluetooth import (
BluetoothServiceInfoBleak,
@@ -12,11 +11,9 @@ from homeassistant.components.bluetooth import (
)
from homeassistant.config_entries import ConfigFlowResult
from homeassistant.const import CONF_ADDRESS
from homeassistant.helpers.selector import (
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
)
# from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
from .api import UUID_COV_SERVICE as UUID
from .const import DOMAIN, LOGGER, MFCT_ID
@@ -66,11 +63,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
if user_input is not None:
return self.async_create_entry(
title=self._discovered_device.name,
data={
"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[
MFCT_ID
].hex()
},
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
)
self._set_confirm_only()
@@ -96,11 +89,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
return self.async_create_entry(
title=self._discovered_device.name,
data={
"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[
MFCT_ID
].hex()
},
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
)
current_addresses = self._async_current_ids()
@@ -122,7 +111,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
if not self._discovered_devices:
return self.async_abort(reason="no_devices_found")
titles: list[SelectOptionDict] = []
titles = []
for address, discovery in self._discovered_devices.items():
titles.append({"value": address, "label": discovery.name})

View File

@@ -3,6 +3,16 @@
import logging
from typing import Final
# from bleak.uuids import normalize_uuid_str
# from homeassistant.const import ( # noqa: F401
# ATTR_BATTERY_CHARGING,
# ATTR_BATTERY_LEVEL,
# ATTR_TEMPERATURE,
# ATTR_VOLTAGE,
# )
DOMAIN: Final[str] = "hunterdouglas_powerview_ble"
LOGGER: Final = logging.getLogger(__package__)
MFCT_ID: Final[int] = 2073

View File

@@ -3,7 +3,6 @@
from typing import Any
from bleak.backends.device import BLEDevice
from homeassistant.components import bluetooth
from homeassistant.components.bluetooth.const import DOMAIN as BLUETOOTH_DOMAIN
from homeassistant.components.bluetooth.passive_update_coordinator import (
@@ -81,7 +80,7 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
def _async_stop(self) -> None:
"""Shutdown coordinator and any connection."""
LOGGER.debug("%s: shutting down BMS device", self.name)
LOGGER.debug("%s: shuting down BMS device", self.name)
self.hass.async_create_task(self.api.disconnect())
super()._async_stop()

View File

@@ -3,7 +3,6 @@
from typing import Any, Final
from bleak.exc import BleakError
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
@@ -34,14 +33,11 @@ async def async_setup_entry(
"""Set up the demo cover platform."""
coordinator: PVCoordinator = config_entry.runtime_data
model: Final[str|None] = coordinator.dev_details.get("model")
entities: list[PowerViewCover] = []
if model in ["39"]:
entities.append(PowerViewCoverTiltOnly(coordinator))
else:
entities.append(PowerViewCover(coordinator))
async_add_entities(entities)
async_add_entities(
[PowerViewCoverTilt(coordinator)]
if coordinator.dev_details.get("model") in ["51", "62"]
else [PowerViewCover(coordinator)]
)
class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEntity): # type: ignore[reportIncompatibleVariableOverride]
@@ -201,7 +197,6 @@ class PowerViewCoverTilt(PowerViewCover):
self,
coordinator: PVCoordinator,
) -> None:
"""Initialize the shade with tilt."""
LOGGER.debug("%s: init() PowerViewCoverTilt", coordinator.name)
super().__init__(coordinator)
@@ -253,45 +248,3 @@ class PowerViewCoverTilt(PowerViewCover):
LOGGER.debug("close cover tilt")
_kwargs = {**kwargs, ATTR_TILT_POSITION: CLOSED_POSITION}
await self.async_set_cover_tilt_position(**_kwargs)
class PowerViewCoverTiltOnly(PowerViewCoverTilt):
"""Representation of a PowerView shade with additional tilt functionality."""
OPENCLOSED_THRESHOLD = 5
_attr_device_class = CoverDeviceClass.BLIND
_attr_supported_features = (
CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
| CoverEntityFeature.STOP_TILT
| CoverEntityFeature.SET_TILT_POSITION
)
def __init__(
self,
coordinator: PVCoordinator,
) -> None:
"""Initialize the shade with tilt only."""
LOGGER.debug("%s: init() PowerViewCoverTiltOnly", coordinator.name)
super().__init__(coordinator)
@property
def is_opening(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is opening or not."""
return False
@property
def is_closing(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is closing or not."""
return False
@property
def is_closed(self) -> bool: # type: ignore[reportIncompatibleVariableOverride]
"""Return if the cover is closed."""
return isinstance(self.current_cover_tilt_position, int) and (
self.current_cover_tilt_position
>= OPEN_POSITION - PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
or self.current_cover_tilt_position
<= CLOSED_POSITION + PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
)

View File

@@ -16,5 +16,5 @@
"issue_tracker": "https://github.com/patman15/hdpv_ble/issues",
"loggers": ["hunterdouglas_powerview_ble"],
"requirements": ["cryptography>=43.0.0"],
"version": "0.23"
"version": "0.22"
}

View File

@@ -3,8 +3,14 @@
from homeassistant.components.bluetooth.passive_update_coordinator import (
PassiveBluetoothCoordinatorEntity,
)
from homeassistant.components.sensor import SensorEntity, SensorEntityDescription
from homeassistant.components.sensor.const import SensorDeviceClass, SensorStateClass
from homeassistant.components.sensor import (
SensorEntity,
SensorEntityDescription,
)
from homeassistant.components.sensor.const import (
SensorDeviceClass,
SensorStateClass,
)
from homeassistant.const import (
ATTR_BATTERY_LEVEL,
PERCENTAGE,
@@ -61,7 +67,7 @@ class PVSensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], SensorEntity):
def __init__(
self, pv_dev: PVCoordinator, descr: SensorEntityDescription, unique_id: str
) -> None:
"""Initialize the BMS sensor."""
"""Intitialize the BMS sensor."""
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
self._attr_device_info = pv_dev.device_info
self.entity_description = descr

View File

@@ -2,12 +2,6 @@
* Emulate a Hunter Douglas PowerView cover device using ESP32
* used e.g. to gain the home_key from an existing installation via BLE
*
* REQUIRES:
* - ESP32 Board Definitions 3.0.x (tested on 3.0.7)
* - WolfSSL 5.7.x (tested on 5.7.6)
* - Phone Region: Potentially an alternative region depending on the app response
* - e.g. To add Parkland shades in the US, phone region set to the UK temporarily
*
* TODO:
* - cleanup code
* - think about emulating a remote
@@ -17,13 +11,9 @@
*/
#define NAME "myPVcover"
const uint16_t SW_VERSION = 391;
const char *SERIAL_NR = "01234567890ABCDEF";
const uint16_t TYP_ID = 42; // 62
const uint16_t MODEL_ID = 224;
const uint16_t FW_REVISION = 27;
const uint32_t HW_REVISION = 171103;
const uint8_t BATTERY_LEVEL = 42;
#define FW_VERSION "391"
#define SERIAL_NR "01234567890ABCDEF"
#include <BLEDevice.h>
#include <BLEServer.h>
@@ -50,11 +40,7 @@ int devId = INVALID_DEVID; //if not using async INVALID_DEVID is default
#define DEV_SERVICE_UUID BLEUUID("180A")
#define SER_CHAR_UUID BLEUUID("2A25")
#define MAN_CHAR_UUID BLEUUID("2A29")
#define MOD_CHAR_UUID BLEUUID("2A24")
#define FWR_CHAR_UUID BLEUUID("2A26")
#define HWR_CHAR_UUID BLEUUID("2A27")
#define SWR_CHAR_UUID BLEUUID("2A28")
#define SWC_CHAR_UUID BLEUUID("2A28")
#define BAT_SERVICE_UUID BLEUUID("180F")
#define BAT_CHAR_UUID BLEUUID("2A19")
@@ -83,7 +69,7 @@ struct notification {
};
BLECharacteristic *pCharacteristic_cover, *pCharacteristic_fw, *pCharacteristic_unknown, *pCharacteristic_bat;
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser, *pCharacteristic_man, *pCharacteristic_mod, *pCharacteristic_fwr, *pCharacteristic_hwr;
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser;
BLEServer *pServer = NULL;
bool deviceConnected = false;
bool oldDeviceConnected = false;
@@ -172,12 +158,12 @@ void decode(BLECharacteristic *pChar) {
memcpy((void *)&msg, data_dec, 4);
Serial.printf("\t message: SRV: %02x, CMD %02x, SEQ %i, LEN %i\n", msg.serviceID, msg.cmdID, msg.sequence, msg.data_len);
// special responses (static data!)
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, (uint8_t)(SW_VERSION & 0xFF), (uint8_t)(SW_VERSION >> 8), 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, TYP_ID, MODEL_ID, 0x08 }; // HW diagnostics
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
// sepecial responses (static data!)
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x2a, 0xe0, 0x08 }; // HW diagnostics
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
Serial.print("\t\t");
switch ((msg.serviceID << 8) | msg.cmdID) {
@@ -193,7 +179,7 @@ void decode(BLECharacteristic *pChar) {
break;
case 0xF711:
// identify
Serial.printf("identify: %i times\n", data_dec[4]);
Serial.printf("identify: %i\n", data_dec[4]);
resp_size = set_response(&response, (const message *)data_dec);
break;
case 0xF7B8:
@@ -222,8 +208,8 @@ void decode(BLECharacteristic *pChar) {
case 0xFB02:
// set shade key
Serial.print("set shade key: ");
print_hex(&data_raw[4], data_len - 4, "\\x", "");
// set response before key, to acknowledge unencrypted
print_hex(&data_raw[4], data_len - 4, "\\x");
// set resonse before key, to acknowledge unencrypted
resp_size = set_response(&response, (const message *)data_dec);
if (msg.data_len == 16) {
memcpy(home_key, &data_raw[4], 16);
@@ -265,9 +251,7 @@ void decode(BLECharacteristic *pChar) {
resp_size = set_response(&response, (const message *)data_dec);
break;
default:
Serial.println(F("*********************************** unknown message (try ACK)"));
resp_size = set_response(&response, (const message *)data_dec);
break;
Serial.println(F("*********************************** unknown message"));
}
if (resp_size) {
pChar->setValue((uint8_t *)&response, resp_size);
@@ -307,7 +291,7 @@ class coverCallbacks : public BLECharacteristicCallbacks {
void onNotify(BLECharacteristic *pCharacteristic) {
Serial.printf("Cover onNotify() %s\n", pCharacteristic->toString().c_str());
}
void onStatus(BLECharacteristic *pCharacteristic, Status s, uint32_t code) {
Serial.printf("Cover onStatus() %s: %s\n", BLEstate[s], pCharacteristic->toString().c_str());
}
@@ -357,7 +341,6 @@ class genericCallbacks : public BLECharacteristicCallbacks {
void setup() {
Serial.begin(115200);
delay(1000); // wait for terminal to be ready
Serial.println(NAME " initializing ...");
BLEDevice::init(NAME);
@@ -390,7 +373,8 @@ pDesc1->setValue("cover");*/
BAT_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_bat->setCallbacks(new batteryCallbacks());
pCharacteristic_bat->setValue((uint8_t *)&BATTERY_LEVEL, 1);
uint8_t battery_level = 42;
pCharacteristic_bat->setValue(&battery_level, 1);
pCharacteristic_bat->addDescriptor(new BLE2902());
BLEService *pFWService = pServer->createService(FW_SERVICE_UUID);
@@ -406,9 +390,9 @@ pDesc1->setValue("cover");*/
BLEService *pDEVService = pServer->createService(DEV_SERVICE_UUID);
pCharacteristic_dev = pDEVService->createCharacteristic(
SWR_CHAR_UUID,
SWC_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_dev->setValue(String(SW_VERSION));
pCharacteristic_dev->setValue(FW_VERSION);
pCharacteristic_dev->setCallbacks(new genericCallbacks());
pCharacteristic_ser = pDEVService->createCharacteristic(
SER_CHAR_UUID,
@@ -416,28 +400,6 @@ pDesc1->setValue("cover");*/
pCharacteristic_ser->setValue(SERIAL_NR);
pCharacteristic_ser->setCallbacks(new genericCallbacks());
pCharacteristic_man = pDEVService->createCharacteristic(
MAN_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_man->setValue("Hunter Douglas");
pCharacteristic_man->setCallbacks(new genericCallbacks());
pCharacteristic_mod = pDEVService->createCharacteristic(
MOD_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_mod->setValue(String(TYP_ID));
pCharacteristic_mod->setCallbacks(new genericCallbacks());
pCharacteristic_fwr = pDEVService->createCharacteristic(
FWR_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_fwr->setValue(String(FW_REVISION));
pCharacteristic_fwr->setCallbacks(new genericCallbacks());
pCharacteristic_hwr = pDEVService->createCharacteristic(
HWR_CHAR_UUID,
BLECharacteristic::PROPERTY_READ);
pCharacteristic_hwr->setValue(String(HW_REVISION));
pCharacteristic_hwr->setCallbacks(new genericCallbacks());
// Start the services
pCovService->start();
pFWService->start();
@@ -446,10 +408,9 @@ pDesc1->setValue("cover");*/
// Start advertising
BLEAdvertisementData AdvertisementData;
const char adv[] = { 0x19, 0x08, 0x00, 0x00, TYP_ID, 0x00, 0x00, 0x00, 0x00, 0x00, 0xA2 };
// Hunter Douglas ^^ -- ^^ ^^key ^^ ^--pos1--^
AdvertisementData.setManufacturerData(String(adv, 11));
const String manufacturerData = String("\x19\x08\x00\x00\x2A\x00\x00\x00\x00\x00\xA2", 11);
// Hunter Douglas ^^--^^ ^^key^^ ^^ ID-Type
AdvertisementData.setManufacturerData(manufacturerData);
AdvertisementData.setPartialServices(BLEUUID(COVER_SERVICE_UUID));
AdvertisementData.setFlags((1 << 2) | (1 << 1)); // [BR/EDR Not Supported] | [LE General Discoverable Mode]

Binary file not shown.

Before

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 53 KiB

View File

@@ -1,17 +1,7 @@
# pyproject.toml
[project]
name = "hunterdouglas_powerview_ble"
classifiers = [
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.13"
]
readme = "README.md"
requires-python = ">=3.13.2"
[project.urls]
"Source Code" = "https://github.com/patman15/hdpv_ble/"
"Bug Reports" = "https://github.com/patman15/hdpv_ble/issues"
requires-python = ">=3.12.0"
[tool.pytest.ini_options]
minversion = "8.0"
@@ -24,9 +14,9 @@ testpaths = [
]
asyncio_mode = "auto"
# ruff settings from HA 2025.2.2
# ruff configuration taken from HA 2024.11.2 (less ignores)
[tool.ruff]
required-version = ">=0.9.1"
required-version = ">=0.6.8"
[tool.ruff.lint]
select = [
@@ -45,10 +35,8 @@ select = [
"B017", # pytest.raises(BaseException) should be considered evil
"B018", # Found useless attribute access. Either assign it to a variable or remove it.
"B023", # Function definition does not bind loop variable {name}
"B024", # `{name}` is an abstract base class, but it has no abstract methods or properties
"B026", # Star-arg unpacking after a keyword argument is strongly discouraged
"B032", # Possible unintentional type annotation (using :). Did you mean to assign (using =)?
"B035", # Dictionary comprehension uses static key
"B904", # Use raise from to specify exception cause
"B905", # zip() without an explicit strict= parameter
"BLE",
@@ -82,27 +70,12 @@ select = [
"RSE", # flake8-raise
"RUF005", # Consider iterable unpacking instead of concatenation
"RUF006", # Store a reference to the return value of asyncio.create_task
"RUF007", # Prefer itertools.pairwise() over zip() when iterating over successive pairs
"RUF008", # Do not use mutable default values for dataclass attributes
"RUF010", # Use explicit conversion flag
"RUF013", # PEP 484 prohibits implicit Optional
"RUF016", # Slice in indexed access to type {value_type} uses type {index_type} instead of an integer
"RUF017", # Avoid quadratic list summation
"RUF018", # Avoid assignment expressions in assert statements
"RUF019", # Unnecessary key check before dictionary access
"RUF020", # {never_like} | T is equivalent to T
"RUF021", # Parenthesize a and b expressions when chaining and and or together, to make the precedence clear
"RUF022", # Sort __all__
"RUF023", # Sort __slots__
"RUF024", # Do not pass mutable objects as values to dict.fromkeys
"RUF026", # default_factory is a positional-only argument to defaultdict
"RUF030", # print() call in assert statement is likely unintentional
"RUF032", # Decimal() called with float literal argument
"RUF033", # __post_init__ method with argument defaults
"RUF034", # Useless if-else condition
"RUF100", # Unused `noqa` directive
"RUF101", # noqa directives that use redirected rule codes
"RUF200", # Failed to parse pyproject.toml: {message}
# "RUF100", # Unused `noqa` directive; temporarily every now and then to clean them up
"S102", # Use of exec detected
"S103", # bad-file-permissions
"S108", # hardcoded-temp-file
@@ -115,7 +88,7 @@ select = [
"S317", # suspicious-xml-sax-usage
"S318", # suspicious-xml-mini-dom-usage
"S319", # suspicious-xml-pull-dom-usage
# "S320", # suspicious-xmle-tree-usage
"S320", # suspicious-xmle-tree-usage
"S601", # paramiko-call
"S602", # subprocess-popen-with-shell-equals-true
"S604", # call-with-shell-equals-true
@@ -126,7 +99,7 @@ select = [
"SLOT", # flake8-slots
"T100", # Trace found: {name} used
"T20", # flake8-print
"TC", # flake8-type-checking
"TCH", # flake8-type-checking
"TID", # Tidy imports
"TRY", # tryceratops
"UP", # pyupgrade
@@ -146,12 +119,13 @@ ignore = [
# "PLC1901", # {existing} can be simplified to {replacement} as an empty string is falsey; too many false positives
# "PLR0911", # Too many return statements ({returns} > {max_returns})
# "PLR0912", # Too many branches ({branches} > {max_branches})
"PLR0913", # Too many arguments to function call ({c_args} > {max_args})
# "PLR0913", # Too many arguments to function call ({c_args} > {max_args})
# "PLR0915", # Too many statements ({statements} > {max_statements})
"PLR2004", # Magic value used in comparison, consider replacing {value} with a constant variable
# "PLW2901", # Outer {outer_kind} variable {name} overwritten by inner {inner_kind} target
# "PT004", # Fixture {fixture} does not return anything, add leading underscore
# "PT011", # pytest.raises({exception}) is too broad, set the `match` parameter or use a more specific exception
"PT018", # Assertion should be broken down into multiple parts
# "PT018", # Assertion should be broken down into multiple parts
# "RUF001", # String contains ambiguous unicode character.
# "RUF002", # Docstring contains ambiguous unicode character.
# "RUF003", # Comment contains ambiguous unicode character.
@@ -162,12 +136,14 @@ ignore = [
# "SIM115", # Use context handler for opening files
# Moving imports into type-checking blocks can mess with pytest.patch()
"TC001", # Move application import {} into a type-checking block
"TC002", # Move third-party import {} into a type-checking block
"TC003", # Move standard library import {} into a type-checking block
"TCH001", # Move application import {} into a type-checking block
"TCH002", # Move third-party import {} into a type-checking block
"TCH003", # Move standard library import {} into a type-checking block
"TRY003", # Avoid specifying long messages outside the exception class
"TRY400", # Use `logging.exception` instead of `logging.error`
# Ignored due to performance: https://github.com/charliermarsh/ruff/issues/2923
"UP038", # Use `X | Y` in `isinstance` call instead of `(X, Y)`
# May conflict with the formatter, https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
"W191",
@@ -185,14 +161,3 @@ ignore = [
"PLE0605"
]
[tool.ruff.lint.isort]
force-sort-within-sections = true
known-first-party = [
"homeassistant",
]
combine-as-imports = true
split-on-trailing-comma = false
[tool.ruff.lint.per-file-ignores]
"scripts/*" = ["T201"]

View File

@@ -1,6 +1,4 @@
homeassistant==2025.11.0
homeassistant==2024.11.0
pip>=21.3.1
ruff>=0.9.1,<=0.15.0
types-requests
mypy~=1.19.1
codespell
ruff==0.6.8

View File

@@ -2,9 +2,9 @@
wheel
home-assistant-bluetooth
habluetooth>=5.3.0
habluetooth>=3.6.0
bluetooth-adapters
pytest>=8.4.1
pytest>=8.3.3
pytest-cov>=5.0.0
pytest-socket>=0.7.0
pytest-asyncio>=0.24.0
@@ -16,10 +16,10 @@ aiohttp
aiohttp_cors
aiohttp-fast-url-dispatcher
aiohttp-zlib-ng
bleak>=1.0.1
bleak-retry-connector>=4.4.3
bleak>=0.22.3
bleak-retry-connector>=3.6.0
bluetooth-data-tools
pyserial-asyncio
pyudev
pytest-homeassistant-custom-component==0.13.294
pytest-homeassistant-custom-component==0.13.181

View File

@@ -1 +0,0 @@
"""Script to extract PowerView homekey from a G3 PowerView Gateway."""

View File

@@ -1,102 +0,0 @@
"""Extract PowerView homekey from a G3 PowerView Gateway."""
import base64
import json
import struct
from typing import Any, Final
import requests
HUB: Final[str] = "http://powerview-g3.local"
TIMEOUT: Final[int] = 10
def create_request(sid: int, cid: int, sequence_id: int, data: bytes) -> bytes:
"""Assemble a request frame for the PowerView protocol."""
return struct.pack("<BBBB", sid, cid, sequence_id, len(data)) + data
def decode_response(packet: bytes) -> dict[str, Any]:
"""Decode a response frame from the PowerView protocol."""
if len(packet) < 4:
raise ValueError("Packet size too small")
sid, cid, sequence_id, length = struct.unpack("<BBBB", packet[0:4])
if len(packet) != 4 + length:
raise ValueError("Not all data present")
if length < 1:
raise ValueError("No errorCode present")
(error_code,) = struct.unpack("<B", packet[4:5])
data: Final[bytes] = packet[5:]
return {
"cid": cid,
"sid": sid,
"sequenceId": sequence_id,
"errorCode": error_code,
"data": data,
}
def create_get_shade_key_request(sequence_id) -> bytes:
"""Create a GetShadeKey request frame."""
return create_request(251, 18, sequence_id, b"")
def get_shade_key(hub: str, ble_name) -> bytes:
"""Get the homekey for a shade."""
try:
shades_exec_resp: requests.Response = requests.post(
hub + "/home/shades/exec?shades=" + ble_name,
json={"hex": create_get_shade_key_request(1).hex()},
timeout=TIMEOUT,
)
shades_exec_resp.raise_for_status()
except requests.exceptions.RequestException as ex:
print(f"Unable to send GetShadeKey {ex!s}")
raise
result: dict = json.loads(shades_exec_resp.content)
if result.get("err") != 0 or len(result.get("responses", [])) != 1:
raise OSError("Error when attempting GetShadeKey")
response: Final[bytes] = bytes.fromhex(result["responses"][0]["hex"])
dec_resp: Final[dict[str, Any]] = decode_response(response)
if dec_resp["errorCode"] != 0:
raise ValueError("BLE errorCode is not 0")
if len(dec_resp["data"]) != 16:
raise ValueError("Expected 16 byte homekey")
return dec_resp["data"]
def main(hub: str) -> int:
"""Extract the homekeys from all shades."""
try:
shades_resp: requests.Response = requests.get(
hub + "/home/shades", timeout=TIMEOUT
)
shades_resp.raise_for_status()
except requests.exceptions.RequestException as ex:
print(f"Unable to get list of shades:\n\t{ex!s}")
return -1
shades = json.loads(shades_resp.content)
print(f"Found {len(shades)} shades, interrogating")
for shade in shades:
name: str = base64.b64decode(shade["name"]).decode("utf-8")
key: bytes = get_shade_key(hub, shade["bleName"])
print(f"Shade '{name}':")
print(f"\tBLE name: '{shade['bleName']}'")
print(f"\tHomeKey: {key.hex()}")
return 0
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Extract PowerView homekey from a G3 PowerView Gateway"
)
parser.add_argument("hub", nargs="?", help="URL to HUB", default=HUB)
args = parser.parse_args()
sys.exit(main(**vars(args)))