Compare commits
21 Commits
7-support-
...
04c7036351
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
04c7036351 | ||
|
|
d66cf61887 | ||
|
|
5d498b8753 | ||
|
|
b558083b50 | ||
|
|
3775496936 | ||
|
|
883aca753e | ||
|
|
fe3646df27 | ||
|
|
bae4158a3c | ||
|
|
d9ebc54026 | ||
|
|
62bbfd7361 | ||
|
|
30dff09bb1 | ||
|
|
74d906151e | ||
|
|
9773e5df65 | ||
|
|
b2d5335e1d | ||
|
|
a6aaf4d727 | ||
|
|
f2ad61a016 | ||
|
|
f6ec17c9b2 | ||
|
|
d79096357d | ||
|
|
52f7390fc0 | ||
|
|
c9a27388af | ||
|
|
054f35f838 |
16
.github/workflows/lint.yml
vendored
16
.github/workflows/lint.yml
vendored
@@ -8,8 +8,8 @@ on:
|
||||
- cron: '0 5 * * 6'
|
||||
|
||||
jobs:
|
||||
ruff:
|
||||
name: "Ruff"
|
||||
lint:
|
||||
name: "Lint the code"
|
||||
runs-on: "ubuntu-latest"
|
||||
steps:
|
||||
- name: "Checkout the repository"
|
||||
@@ -18,11 +18,17 @@ jobs:
|
||||
- name: "Set up Python"
|
||||
uses: actions/setup-python@main
|
||||
with:
|
||||
python-version: "3.12"
|
||||
python-version: "3.13.2"
|
||||
cache: "pip"
|
||||
|
||||
- name: "Install requirements"
|
||||
run: python3 -m pip install -r requirements.txt
|
||||
|
||||
- name: "Run Ruff"
|
||||
run: python3 -m ruff check .
|
||||
- name: "Run ruff"
|
||||
run: ruff check .
|
||||
|
||||
- name: "Run mypy"
|
||||
run: mypy .
|
||||
|
||||
- name: "Run codespell"
|
||||
run: codespell -L hass
|
||||
27
README.md
27
README.md
@@ -8,7 +8,7 @@
|
||||
> [!WARNING]
|
||||
> - This integration is under development!
|
||||
> - Test coverage is low, malfunction might occur.
|
||||
> - Currently only position change is supported (e.g., no tilt)
|
||||
> - The HOME_KEY is lost over updates!
|
||||
|
||||
## Features
|
||||
- Zero configuration
|
||||
@@ -25,6 +25,7 @@ Type* | Description
|
||||
10 | Duette and Applause SkyLift
|
||||
19 | Provenance Woven Wood
|
||||
31, 32, 84 | Vignette
|
||||
39 | Parkland
|
||||
42 | M25T Roller Blind
|
||||
49 | AC Roller
|
||||
52 | Banded Shades
|
||||
@@ -61,22 +62,21 @@ Installation can be done using [HACS](https://hacs.xyz/) by [adding a custom rep
|
||||
1. In the HA UI go to "Configuration" -> "Integrations" click "+" and search for "Hunter Douglas PowerView (BLE)"
|
||||
|
||||
## Set the Encryption Key
|
||||
Currently, there are three methods to optain the key:
|
||||
Currently, there are three methods to obtain the key:
|
||||
|
||||
1. Via adopting a BLE shade: There is a [shade emulator](/emu/PV_BLE_cover) that works with Arduino IDE and an ESP32 device (≥ 2MiB flash, ≥ 128KiB required), e.g. [Adafruit QT Py ESP32-S3](https://www.adafruit.com/product/5426). Install and connect via serial port, then go to the PowerView app and add the shade `myPVcover` to your home. You will see a log message `set shade key: \xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx\xx` . Copy this key. You can delete the shade from the app when done.
|
||||
2. Extracting from gateway: This [PR](https://github.com/patman15/hdpv_ble/pull/2) proposes a script to extract the key from a working PowerView gateway.
|
||||
3. Grabing from the app: Checkout this [post in the Home Assistant community forum](https://community.home-assistant.io/t/hunter-douglas-powerview-gen-3-integration/424836/228).
|
||||
2. Extracting from gateway: This [script](scripts/extract_gateway3_homekey.py) is able to extract the key from a working PowerView gateway.
|
||||
3. Grabbing from the app: Checkout this [post in the Home Assistant community forum](https://community.home-assistant.io/t/hunter-douglas-powerview-gen-3-integration/424836/228).
|
||||
|
||||
Finally, you need to manually copy the key to [`const.py`](https://github.com/patman15/hdpv_ble/blob/main/custom_components/hunterdouglas_powerview_ble/const.py).
|
||||
|
||||
> [!IMPORTANT]
|
||||
> You need to update the file after **each** update!
|
||||
|
||||
## Outlook
|
||||
- Add support for encryption
|
||||
- Allow parallel usage to PowerView app as "remote"
|
||||
- Add support for tilt function
|
||||
- Add support for further device types
|
||||
## Known Issues
|
||||
<details><summary>Shade inoperable after charging</summary>
|
||||
It seems that the shades require some re-initialization after charging. The solution is currently unknown, but as a workaround you can operate the shade ones using the vendor app.
|
||||
</details>
|
||||
|
||||
## Troubleshooting
|
||||
In case you have severe troubles,
|
||||
@@ -86,6 +86,15 @@ In case you have severe troubles,
|
||||
- disable the log (Home Assistant will prompt you to download the log), and finally
|
||||
- [open an issue](https://github.com/patman15/hdpv_ble/issues/new?assignees=&labels=Bug&projects=&template=bug.yml) with a good description of what happened and attach the log.
|
||||
|
||||
# Thanks To
|
||||
[@mannkind](https://github.com/mannkind)
|
||||
|
||||
[license-shield]: https://img.shields.io/github/license/patman15/hdpv_ble.svg?style=for-the-badge
|
||||
[releases-shield]: https://img.shields.io/github/release/patman15/hdpv_ble.svg?style=for-the-badge
|
||||
[releases]: https://github.com//patman15/hdpv_ble/releases
|
||||
|
||||
## Outlook
|
||||
- Add tests!
|
||||
- Allow parallel usage to PowerView app as "remote"
|
||||
- Add support for tilt function
|
||||
- Add support for further device types
|
||||
|
||||
@@ -4,17 +4,24 @@
|
||||
@license: Apache-2.0 license
|
||||
"""
|
||||
|
||||
from bleak.backends.device import BLEDevice
|
||||
from bleak.exc import BleakError
|
||||
|
||||
from homeassistant.components.bluetooth import async_ble_device_from_address
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import Platform
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
|
||||
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .const import LOGGER
|
||||
from .coordinator import PVCoordinator
|
||||
|
||||
PLATFORMS: list[Platform] = [Platform.BINARY_SENSOR, Platform.COVER, Platform.SENSOR, Platform.BUTTON]
|
||||
PLATFORMS: list[Platform] = [
|
||||
Platform.BINARY_SENSOR,
|
||||
Platform.COVER,
|
||||
Platform.SENSOR,
|
||||
Platform.BUTTON,
|
||||
]
|
||||
|
||||
type ConfigEntryType = ConfigEntry[PVCoordinator]
|
||||
|
||||
@@ -26,7 +33,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntryType) -> bool
|
||||
if entry.unique_id is None:
|
||||
raise ConfigEntryError("Missing unique ID for device.")
|
||||
|
||||
ble_device = async_ble_device_from_address(
|
||||
ble_device: BLEDevice | None = async_ble_device_from_address(
|
||||
hass=hass, address=entry.unique_id, connectable=True
|
||||
)
|
||||
|
||||
@@ -41,8 +48,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntryType) -> bool
|
||||
except BleakError as err:
|
||||
raise ConfigEntryNotReady("Unable to query device info.") from err
|
||||
|
||||
# Insert the coordinator in the global registry
|
||||
hass.data.setdefault(DOMAIN, {})
|
||||
entry.runtime_data = coordinator
|
||||
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||
entry.async_on_unload(coordinator.async_start())
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
"""Hunter Douglas PowerView BLE API."""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
import time
|
||||
from typing import Final
|
||||
|
||||
from bleak import BleakClient
|
||||
@@ -12,8 +12,15 @@ from bleak.exc import BleakError
|
||||
from bleak.uuids import normalize_uuid_str
|
||||
from bleak_retry_connector import establish_connection
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from cryptography.hazmat.primitives.ciphers.base import CipherContext
|
||||
from homeassistant.components.cover import ATTR_CURRENT_POSITION
|
||||
from cryptography.hazmat.primitives.ciphers.base import (
|
||||
AEADDecryptionContext,
|
||||
AEADEncryptionContext,
|
||||
)
|
||||
|
||||
from homeassistant.components.cover import (
|
||||
ATTR_CURRENT_POSITION,
|
||||
ATTR_CURRENT_TILT_POSITION,
|
||||
)
|
||||
|
||||
from .const import LOGGER, TIMEOUT
|
||||
|
||||
@@ -26,6 +33,7 @@ ATTR_ACTIVITY: Final[str] = "activity"
|
||||
|
||||
|
||||
SHADE_TYPE: Final[dict[int, str]] = {
|
||||
# up down only
|
||||
1: "Designer Roller",
|
||||
4: "Roman",
|
||||
5: "Bottom Up",
|
||||
@@ -39,6 +47,15 @@ SHADE_TYPE: Final[dict[int, str]] = {
|
||||
52: "Banded Shades",
|
||||
53: "Sonnette",
|
||||
84: "Vignette",
|
||||
# top down bottom up
|
||||
8: "Duette, Top Down Bottom Up",
|
||||
9: "Duette DuoLite, Top Down Bottom Up",
|
||||
33: "Duette Architella, Top Down Bottom Up",
|
||||
39: "Parkland",
|
||||
47: "Pleated, Top Down Bottom Up",
|
||||
# top down, tilt anywhere
|
||||
51: "Venetian, Tilt Anywhere",
|
||||
62: "Venetian, Tilt Anywhere",
|
||||
}
|
||||
|
||||
OPEN_POSITION: Final[int] = 100
|
||||
@@ -88,18 +105,18 @@ class PowerViewBLE:
|
||||
disconnected_callback=self._on_disconnect,
|
||||
services=[
|
||||
UUID_COV_SERVICE,
|
||||
# self.UUID_DEV_SERVICE,
|
||||
UUID_DEV_SERVICE,
|
||||
# self.UUID_BAT_SERVICE,
|
||||
],
|
||||
)
|
||||
self._data_event = asyncio.Event()
|
||||
self._data: bytearray
|
||||
self._data: bytes = b""
|
||||
self._info: PVDeviceInfo = PVDeviceInfo()
|
||||
self._cmd_lock: Final = asyncio.Lock()
|
||||
self._cmd_next = None
|
||||
self._is_encrypted: bool = False
|
||||
self._cipher: Final = (
|
||||
Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16)))
|
||||
self._cmd_lock: Final = asyncio.Lock()
|
||||
self._cmd_next: tuple[ShadeCmd, bytes]
|
||||
self._cipher: Final[Cipher | None] = (
|
||||
Cipher(algorithms.AES(home_key), modes.CTR(bytes(16)))
|
||||
if len(home_key) == 16
|
||||
else None
|
||||
)
|
||||
@@ -128,9 +145,7 @@ class PowerViewBLE:
|
||||
return self._client.is_connected
|
||||
|
||||
# general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len
|
||||
async def _cmd(
|
||||
self, cmd: tuple[ShadeCmd, bytearray], disconnect: bool = True
|
||||
) -> None:
|
||||
async def _cmd(self, cmd: tuple[ShadeCmd, bytes], disconnect: bool = True) -> None:
|
||||
self._cmd_next = cmd
|
||||
if self._cmd_lock.locked():
|
||||
LOGGER.debug("%s: device busy, queuing %s command", self.name, cmd[0])
|
||||
@@ -139,17 +154,15 @@ class PowerViewBLE:
|
||||
async with self._cmd_lock:
|
||||
try:
|
||||
await self._connect()
|
||||
cmd_run = self._cmd_next
|
||||
tx_data = (
|
||||
bytearray(
|
||||
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
|
||||
+ bytes([self._seqcnt, len(cmd_run[1])])
|
||||
)
|
||||
cmd_run: tuple[ShadeCmd, bytes] = self._cmd_next
|
||||
tx_data: bytes = bytes(
|
||||
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
|
||||
+ bytes([self._seqcnt, len(cmd_run[1])])
|
||||
+ cmd_run[1]
|
||||
)
|
||||
LOGGER.debug("sending cmd: %s", tx_data.hex(" "))
|
||||
if self._cipher is not None and self._is_encrypted:
|
||||
enc = self._cipher.encryptor()
|
||||
enc: AEADEncryptionContext = self._cipher.encryptor()
|
||||
tx_data = enc.update(tx_data) + enc.finalize()
|
||||
LOGGER.debug(" encrypted: %s", tx_data.hex(" "))
|
||||
self._data_event.clear()
|
||||
@@ -174,13 +187,13 @@ class PowerViewBLE:
|
||||
if len(data) != 9:
|
||||
LOGGER.debug("not a V2 record!")
|
||||
return []
|
||||
pos = int.from_bytes(data[3:5], byteorder="little")
|
||||
pos2 = (int(data[5]) << 4) + (int(data[4]) >> 4)
|
||||
pos: Final[int] = int.from_bytes(data[3:5], byteorder="little")
|
||||
pos2: Final[int] = (int(data[5]) << 4) + (int(data[4]) >> 4)
|
||||
return [
|
||||
(ATTR_CURRENT_POSITION, ((pos >> 2) / 10)),
|
||||
("position2", pos2 >> 2),
|
||||
("position3", int(data[6])),
|
||||
("tilt", int(data[7])),
|
||||
(ATTR_CURRENT_TILT_POSITION, int(data[7])),
|
||||
("home_id", int.from_bytes(data[0:2], byteorder="little")),
|
||||
("type_id", int(data[2])),
|
||||
("is_opening", bool(pos & 0x3 == 0x2)),
|
||||
@@ -192,16 +205,33 @@ class PowerViewBLE:
|
||||
]
|
||||
|
||||
# position cmd: uint16_t pos1, uint16_t pos2, uint16_t pos3, uint16_t tilt, uint8_t velocity
|
||||
async def set_position(self, value: int, disconnect: bool = True) -> None:
|
||||
async def set_position(
|
||||
self,
|
||||
pos1: int,
|
||||
pos2: int = 0x8000,
|
||||
pos3: int = 0x8000,
|
||||
tilt: int = 0x8000,
|
||||
velocity: int = 0x0,
|
||||
disconnect: bool = True,
|
||||
) -> None:
|
||||
"""Set position of device."""
|
||||
LOGGER.debug("%s setting position to %i", self.name, value)
|
||||
LOGGER.debug(
|
||||
"%s setting position to %i/%i/%i, tilt %i, velocity %s",
|
||||
self.name,
|
||||
pos1,
|
||||
pos2,
|
||||
pos3,
|
||||
tilt,
|
||||
velocity,
|
||||
)
|
||||
await self._cmd(
|
||||
(
|
||||
ShadeCmd.SET_POSITION,
|
||||
bytearray(
|
||||
int.to_bytes(value * 100, 2, byteorder="little")
|
||||
+ bytes([0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x0])
|
||||
),
|
||||
int.to_bytes(pos1*100, 2, byteorder="little")
|
||||
+ int.to_bytes(pos2, 2, byteorder="little")
|
||||
+ int.to_bytes(pos3, 2, byteorder="little")
|
||||
+ int.to_bytes(tilt, 2, byteorder="little")
|
||||
+ int.to_bytes(velocity, 1),
|
||||
),
|
||||
disconnect,
|
||||
)
|
||||
@@ -214,7 +244,7 @@ class PowerViewBLE:
|
||||
async def stop(self) -> None:
|
||||
"""Stop device movement."""
|
||||
LOGGER.debug("%s stop", self.name)
|
||||
await self._cmd((ShadeCmd.STOP, bytearray()))
|
||||
await self._cmd((ShadeCmd.STOP, b""))
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Fully close cover."""
|
||||
@@ -230,19 +260,19 @@ class PowerViewBLE:
|
||||
await self._cmd(
|
||||
(
|
||||
ShadeCmd.ACTIVATE_SCENE,
|
||||
bytearray(int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2])),
|
||||
int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2]),
|
||||
),
|
||||
)
|
||||
|
||||
async def identify(self, beeps: int = 0x3) -> None:
|
||||
"""Identify device."""
|
||||
LOGGER.debug("%s identify (%i)", self.name, beeps)
|
||||
await self._cmd((ShadeCmd.IDENTIFY, bytearray([min(beeps, 0xFF)])))
|
||||
await self._cmd((ShadeCmd.IDENTIFY, bytes([min(beeps, 0xFF)])))
|
||||
|
||||
def _verify_response(self, data: bytearray, seq_nr: int, cmd: ShadeCmd) -> bool:
|
||||
def _verify_response(self, data: bytes, seq_nr: int, cmd: ShadeCmd) -> bool:
|
||||
"""Verify shade response data."""
|
||||
if len(data) < 4:
|
||||
LOGGER.error("Reponse message too short")
|
||||
LOGGER.error("Response message too short")
|
||||
return False
|
||||
if int.from_bytes(data[0:2], byteorder="little") != cmd.value & 0xFFEF:
|
||||
LOGGER.warning("Response to wrong command")
|
||||
@@ -283,6 +313,9 @@ class PowerViewBLE:
|
||||
.copy()
|
||||
.decode("UTF-8")
|
||||
)
|
||||
except BleakError as ex:
|
||||
LOGGER.debug("%s: querying failed: %s", self.name, ex)
|
||||
raise
|
||||
finally:
|
||||
await self.disconnect()
|
||||
LOGGER.debug("%s device data: %s", self.name, data)
|
||||
@@ -295,11 +328,15 @@ class PowerViewBLE:
|
||||
|
||||
def _notification_handler(self, _sender, data: bytearray) -> None:
|
||||
LOGGER.debug("%s received BLE data: %s", self.name, data.hex(" "))
|
||||
self._data = data
|
||||
self._data = bytes(data)
|
||||
if self._cipher is not None and self._is_encrypted:
|
||||
dec: CipherContext = self._cipher.decryptor()
|
||||
self._data = bytearray(dec.update(data) + dec.finalize())
|
||||
LOGGER.debug("%s %s", "decoded data: ".rjust(19+len(self.name)), self._data.hex(" "))
|
||||
dec: AEADDecryptionContext = self._cipher.decryptor()
|
||||
self._data = bytes(dec.update(bytes(data)) + dec.finalize())
|
||||
LOGGER.debug(
|
||||
"%s %s",
|
||||
"decoded data: ".rjust(19 + len(self.name)),
|
||||
self._data.hex(" "),
|
||||
)
|
||||
|
||||
self._data_event.set()
|
||||
|
||||
@@ -320,7 +357,7 @@ class PowerViewBLE:
|
||||
disconnected_callback=self._on_disconnect,
|
||||
services=[
|
||||
UUID_COV_SERVICE,
|
||||
# self.UUID_DEV_SERVICE,
|
||||
UUID_DEV_SERVICE,
|
||||
# self.UUID_BAT_SERVICE,
|
||||
],
|
||||
)
|
||||
|
||||
@@ -49,7 +49,7 @@ class PVBinarySensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], BinarySen
|
||||
descr: BinarySensorEntityDescription,
|
||||
unique_id: str,
|
||||
) -> None:
|
||||
"""Intialize PV binary sensor."""
|
||||
"""Initialize PV binary sensor."""
|
||||
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
|
||||
self._attr_device_info = coord.device_info
|
||||
self._attr_has_entity_name = True
|
||||
|
||||
@@ -4,6 +4,7 @@ from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components.bluetooth import (
|
||||
BluetoothServiceInfoBleak,
|
||||
@@ -11,9 +12,11 @@ from homeassistant.components.bluetooth import (
|
||||
)
|
||||
from homeassistant.config_entries import ConfigFlowResult
|
||||
from homeassistant.const import CONF_ADDRESS
|
||||
|
||||
# from homeassistant.helpers.device_registry import format_mac
|
||||
from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig
|
||||
from homeassistant.helpers.selector import (
|
||||
SelectOptionDict,
|
||||
SelectSelector,
|
||||
SelectSelectorConfig,
|
||||
)
|
||||
|
||||
from .api import UUID_COV_SERVICE as UUID
|
||||
from .const import DOMAIN, LOGGER, MFCT_ID
|
||||
@@ -63,7 +66,11 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
if user_input is not None:
|
||||
return self.async_create_entry(
|
||||
title=self._discovered_device.name,
|
||||
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
|
||||
data={
|
||||
"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[
|
||||
MFCT_ID
|
||||
].hex()
|
||||
},
|
||||
)
|
||||
|
||||
self._set_confirm_only()
|
||||
@@ -89,7 +96,11 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
|
||||
return self.async_create_entry(
|
||||
title=self._discovered_device.name,
|
||||
data={"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[MFCT_ID].hex()},
|
||||
data={
|
||||
"manufacturer_data": self._discovered_device.discovery_info.manufacturer_data[
|
||||
MFCT_ID
|
||||
].hex()
|
||||
},
|
||||
)
|
||||
|
||||
current_addresses = self._async_current_ids()
|
||||
@@ -111,7 +122,7 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
if not self._discovered_devices:
|
||||
return self.async_abort(reason="no_devices_found")
|
||||
|
||||
titles = []
|
||||
titles: list[SelectOptionDict] = []
|
||||
for address, discovery in self._discovered_devices.items():
|
||||
titles.append({"value": address, "label": discovery.name})
|
||||
|
||||
|
||||
@@ -3,16 +3,6 @@
|
||||
import logging
|
||||
from typing import Final
|
||||
|
||||
# from bleak.uuids import normalize_uuid_str
|
||||
|
||||
# from homeassistant.const import ( # noqa: F401
|
||||
# ATTR_BATTERY_CHARGING,
|
||||
# ATTR_BATTERY_LEVEL,
|
||||
# ATTR_TEMPERATURE,
|
||||
# ATTR_VOLTAGE,
|
||||
# )
|
||||
|
||||
|
||||
DOMAIN: Final[str] = "hunterdouglas_powerview_ble"
|
||||
LOGGER: Final = logging.getLogger(__package__)
|
||||
MFCT_ID: Final[int] = 2073
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
from typing import Any
|
||||
|
||||
from bleak.backends.device import BLEDevice
|
||||
|
||||
from homeassistant.components import bluetooth
|
||||
from homeassistant.components.bluetooth.const import DOMAIN as BLUETOOTH_DOMAIN
|
||||
from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
@@ -80,7 +81,7 @@ class PVCoordinator(PassiveBluetoothDataUpdateCoordinator):
|
||||
|
||||
def _async_stop(self) -> None:
|
||||
"""Shutdown coordinator and any connection."""
|
||||
LOGGER.debug("%s: shuting down BMS device", self.name)
|
||||
LOGGER.debug("%s: shutting down BMS device", self.name)
|
||||
self.hass.async_create_task(self.api.disconnect())
|
||||
super()._async_stop()
|
||||
|
||||
|
||||
@@ -3,12 +3,15 @@
|
||||
from typing import Any, Final
|
||||
|
||||
from bleak.exc import BleakError
|
||||
|
||||
from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
PassiveBluetoothCoordinatorEntity,
|
||||
)
|
||||
from homeassistant.components.cover import (
|
||||
ATTR_CURRENT_POSITION,
|
||||
ATTR_CURRENT_TILT_POSITION,
|
||||
ATTR_POSITION,
|
||||
ATTR_TILT_POSITION,
|
||||
CoverDeviceClass,
|
||||
CoverEntity,
|
||||
CoverEntityFeature,
|
||||
@@ -31,11 +34,18 @@ async def async_setup_entry(
|
||||
"""Set up the demo cover platform."""
|
||||
|
||||
coordinator: PVCoordinator = config_entry.runtime_data
|
||||
async_add_entities([PowerViewCover(coordinator)])
|
||||
model: Final[str|None] = coordinator.dev_details.get("model")
|
||||
entities: list[PowerViewCover] = []
|
||||
if model in ["39"]:
|
||||
entities.append(PowerViewCoverTiltOnly(coordinator))
|
||||
else:
|
||||
entities.append(PowerViewCover(coordinator))
|
||||
|
||||
async_add_entities(entities)
|
||||
|
||||
|
||||
class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEntity): # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Representation of a powerview shade."""
|
||||
"""Representation of a PowerView shade with Up/Down functionality only."""
|
||||
|
||||
_attr_has_entity_name = True
|
||||
_attr_device_class = CoverDeviceClass.SHADE
|
||||
@@ -51,8 +61,9 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
coordinator: PVCoordinator,
|
||||
) -> None:
|
||||
"""Initialize the shade."""
|
||||
LOGGER.debug("%s: init() PowerViewCover", coordinator.name)
|
||||
self._attr_name = CoverDeviceClass.SHADE
|
||||
self._coord = coordinator
|
||||
self._coord: PVCoordinator = coordinator
|
||||
self._attr_device_info = self._coord.device_info
|
||||
self._target_position: int | None = round(
|
||||
self._coord.data.get(ATTR_CURRENT_POSITION, OPEN_POSITION)
|
||||
@@ -170,3 +181,117 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error("Failed to stop cover '%s': %s", self.name, err)
|
||||
|
||||
|
||||
class PowerViewCoverTilt(PowerViewCover):
|
||||
"""Representation of a PowerView shade with additional tilt functionality."""
|
||||
|
||||
_attr_supported_features = (
|
||||
CoverEntityFeature.OPEN
|
||||
| CoverEntityFeature.CLOSE
|
||||
| CoverEntityFeature.STOP
|
||||
| CoverEntityFeature.SET_POSITION
|
||||
| CoverEntityFeature.OPEN_TILT
|
||||
| CoverEntityFeature.CLOSE_TILT
|
||||
| CoverEntityFeature.STOP_TILT
|
||||
| CoverEntityFeature.SET_TILT_POSITION
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: PVCoordinator,
|
||||
) -> None:
|
||||
"""Initialize the shade with tilt."""
|
||||
LOGGER.debug("%s: init() PowerViewCoverTilt", coordinator.name)
|
||||
super().__init__(coordinator)
|
||||
|
||||
@property
|
||||
def current_cover_tilt_position(self) -> int | None: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return current tilt of cover.
|
||||
|
||||
None is unknown
|
||||
"""
|
||||
pos: Final = self._coord.data.get(ATTR_CURRENT_TILT_POSITION)
|
||||
return round(pos) if pos is not None else None
|
||||
|
||||
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
|
||||
"""Move the tilt to a specific position."""
|
||||
|
||||
if isinstance(target_position := kwargs.get(ATTR_TILT_POSITION), int):
|
||||
LOGGER.debug("set cover tilt to position %i", target_position)
|
||||
if (
|
||||
self.current_cover_tilt_position == round(target_position)
|
||||
or self.current_cover_position is None
|
||||
):
|
||||
return
|
||||
|
||||
try:
|
||||
await self._coord.api.set_position(
|
||||
self.current_cover_position, tilt=target_position
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error(
|
||||
"Failed to tilt cover '%s' to %f%%: %s",
|
||||
self.name,
|
||||
target_position,
|
||||
err,
|
||||
)
|
||||
|
||||
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
|
||||
"""Stop the cover."""
|
||||
await self.async_stop_cover(kwargs=kwargs)
|
||||
|
||||
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
|
||||
"""Open the cover tilt."""
|
||||
LOGGER.debug("open cover tilt")
|
||||
_kwargs = {**kwargs, ATTR_TILT_POSITION: OPEN_POSITION}
|
||||
await self.async_set_cover_tilt_position(**_kwargs)
|
||||
|
||||
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
|
||||
"""Close the cover tilt."""
|
||||
LOGGER.debug("close cover tilt")
|
||||
_kwargs = {**kwargs, ATTR_TILT_POSITION: CLOSED_POSITION}
|
||||
await self.async_set_cover_tilt_position(**_kwargs)
|
||||
|
||||
|
||||
class PowerViewCoverTiltOnly(PowerViewCoverTilt):
|
||||
"""Representation of a PowerView shade with additional tilt functionality."""
|
||||
|
||||
OPENCLOSED_THRESHOLD = 5
|
||||
|
||||
_attr_device_class = CoverDeviceClass.BLIND
|
||||
_attr_supported_features = (
|
||||
CoverEntityFeature.OPEN_TILT
|
||||
| CoverEntityFeature.CLOSE_TILT
|
||||
| CoverEntityFeature.STOP_TILT
|
||||
| CoverEntityFeature.SET_TILT_POSITION
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: PVCoordinator,
|
||||
) -> None:
|
||||
"""Initialize the shade with tilt only."""
|
||||
LOGGER.debug("%s: init() PowerViewCoverTiltOnly", coordinator.name)
|
||||
super().__init__(coordinator)
|
||||
|
||||
@property
|
||||
def is_opening(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return if the cover is opening or not."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def is_closing(self) -> bool | None: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return if the cover is closing or not."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def is_closed(self) -> bool: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return if the cover is closed."""
|
||||
return isinstance(self.current_cover_tilt_position, int) and (
|
||||
self.current_cover_tilt_position
|
||||
>= OPEN_POSITION - PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
|
||||
or self.current_cover_tilt_position
|
||||
<= CLOSED_POSITION + PowerViewCoverTiltOnly.OPENCLOSED_THRESHOLD
|
||||
)
|
||||
|
||||
@@ -4,8 +4,7 @@
|
||||
"bluetooth": [
|
||||
{
|
||||
"service_uuid": "0000fdc1-0000-1000-8000-00805f9b34fb",
|
||||
"manufacturer_id": 2073,
|
||||
"manufacturer_data_start": [0,0]
|
||||
"manufacturer_id": 2073
|
||||
}
|
||||
],
|
||||
"codeowners": ["@patman15"],
|
||||
@@ -17,5 +16,5 @@
|
||||
"issue_tracker": "https://github.com/patman15/hdpv_ble/issues",
|
||||
"loggers": ["hunterdouglas_powerview_ble"],
|
||||
"requirements": ["cryptography>=43.0.0"],
|
||||
"version": "0.22"
|
||||
"version": "0.23"
|
||||
}
|
||||
|
||||
@@ -3,14 +3,8 @@
|
||||
from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
PassiveBluetoothCoordinatorEntity,
|
||||
)
|
||||
from homeassistant.components.sensor import (
|
||||
SensorEntity,
|
||||
SensorEntityDescription,
|
||||
)
|
||||
from homeassistant.components.sensor.const import (
|
||||
SensorDeviceClass,
|
||||
SensorStateClass,
|
||||
)
|
||||
from homeassistant.components.sensor import SensorEntity, SensorEntityDescription
|
||||
from homeassistant.components.sensor.const import SensorDeviceClass, SensorStateClass
|
||||
from homeassistant.const import (
|
||||
ATTR_BATTERY_LEVEL,
|
||||
PERCENTAGE,
|
||||
@@ -67,7 +61,7 @@ class PVSensor(PassiveBluetoothCoordinatorEntity[PVCoordinator], SensorEntity):
|
||||
def __init__(
|
||||
self, pv_dev: PVCoordinator, descr: SensorEntityDescription, unique_id: str
|
||||
) -> None:
|
||||
"""Intitialize the BMS sensor."""
|
||||
"""Initialize the BMS sensor."""
|
||||
self._attr_unique_id = f"{DOMAIN}-{unique_id}-{descr.key}"
|
||||
self._attr_device_info = pv_dev.device_info
|
||||
self.entity_description = descr
|
||||
|
||||
@@ -2,6 +2,12 @@
|
||||
* Emulate a Hunter Douglas PowerView cover device using ESP32
|
||||
* used e.g. to gain the home_key from an existing installation via BLE
|
||||
*
|
||||
* REQUIRES:
|
||||
* - ESP32 Board Definitions 3.0.x (tested on 3.0.7)
|
||||
* - WolfSSL 5.7.x (tested on 5.7.6)
|
||||
* - Phone Region: Potentially an alternative region depending on the app response
|
||||
* - e.g. To add Parkland shades in the US, phone region set to the UK temporarily
|
||||
*
|
||||
* TODO:
|
||||
* - cleanup code
|
||||
* - think about emulating a remote
|
||||
@@ -11,9 +17,13 @@
|
||||
*/
|
||||
|
||||
#define NAME "myPVcover"
|
||||
#define FW_VERSION "391"
|
||||
#define SERIAL_NR "01234567890ABCDEF"
|
||||
|
||||
const uint16_t SW_VERSION = 391;
|
||||
const char *SERIAL_NR = "01234567890ABCDEF";
|
||||
const uint16_t TYP_ID = 42; // 62
|
||||
const uint16_t MODEL_ID = 224;
|
||||
const uint16_t FW_REVISION = 27;
|
||||
const uint32_t HW_REVISION = 171103;
|
||||
const uint8_t BATTERY_LEVEL = 42;
|
||||
|
||||
#include <BLEDevice.h>
|
||||
#include <BLEServer.h>
|
||||
@@ -40,7 +50,11 @@ int devId = INVALID_DEVID; //if not using async INVALID_DEVID is default
|
||||
|
||||
#define DEV_SERVICE_UUID BLEUUID("180A")
|
||||
#define SER_CHAR_UUID BLEUUID("2A25")
|
||||
#define SWC_CHAR_UUID BLEUUID("2A28")
|
||||
#define MAN_CHAR_UUID BLEUUID("2A29")
|
||||
#define MOD_CHAR_UUID BLEUUID("2A24")
|
||||
#define FWR_CHAR_UUID BLEUUID("2A26")
|
||||
#define HWR_CHAR_UUID BLEUUID("2A27")
|
||||
#define SWR_CHAR_UUID BLEUUID("2A28")
|
||||
|
||||
#define BAT_SERVICE_UUID BLEUUID("180F")
|
||||
#define BAT_CHAR_UUID BLEUUID("2A19")
|
||||
@@ -69,7 +83,7 @@ struct notification {
|
||||
};
|
||||
|
||||
BLECharacteristic *pCharacteristic_cover, *pCharacteristic_fw, *pCharacteristic_unknown, *pCharacteristic_bat;
|
||||
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser;
|
||||
BLECharacteristic *pCharacteristic_dev, *pCharacteristic_ser, *pCharacteristic_man, *pCharacteristic_mod, *pCharacteristic_fwr, *pCharacteristic_hwr;
|
||||
BLEServer *pServer = NULL;
|
||||
bool deviceConnected = false;
|
||||
bool oldDeviceConnected = false;
|
||||
@@ -158,12 +172,12 @@ void decode(BLECharacteristic *pChar) {
|
||||
memcpy((void *)&msg, data_dec, 4);
|
||||
Serial.printf("\t message: SRV: %02x, CMD %02x, SEQ %i, LEN %i\n", msg.serviceID, msg.cmdID, msg.sequence, msg.data_len);
|
||||
|
||||
// sepecial responses (static data!)
|
||||
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
|
||||
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x2a, 0xe0, 0x08 }; // HW diagnostics
|
||||
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
|
||||
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
|
||||
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
|
||||
// special responses (static data!)
|
||||
const byte ret_valF1DD[] = { 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x87, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // product info
|
||||
const byte ret_valFFDD[] = { 0x00, 0x05, 0xd1, 0xa2, 0x9a, 0x42, 0x59, 0x5d, 0x5c, 0x52, 0x1b, 0x00, 0x00, 0x00, (uint8_t)(SW_VERSION & 0xFF), (uint8_t)(SW_VERSION >> 8), 0x00, 0x00, 0x5f, 0x9c, 0x02, 0x00, 0x5f, 0x9c, 0x02, 0x00, TYP_ID, MODEL_ID, 0x08 }; // HW diagnostics
|
||||
const byte ret_valFFDE[] = { 0x08, 0x00, 0x02, 0x26, 0x72, 0x01, 0x59, 0x01, 0x00 }; // power status
|
||||
const byte ret_valFA5B[] = { 0x00, 0x0a, 0xa2, 0x88, 0x13, 0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 }; // get scene
|
||||
const byte ret_valFA5A[] = { 0x00, 0x02, 0xb0 }; // set scene
|
||||
|
||||
Serial.print("\t\t");
|
||||
switch ((msg.serviceID << 8) | msg.cmdID) {
|
||||
@@ -209,7 +223,7 @@ void decode(BLECharacteristic *pChar) {
|
||||
// set shade key
|
||||
Serial.print("set shade key: ");
|
||||
print_hex(&data_raw[4], data_len - 4, "\\x", "");
|
||||
// set resonse before key, to acknowledge unencrypted
|
||||
// set response before key, to acknowledge unencrypted
|
||||
resp_size = set_response(&response, (const message *)data_dec);
|
||||
if (msg.data_len == 16) {
|
||||
memcpy(home_key, &data_raw[4], 16);
|
||||
@@ -251,7 +265,9 @@ void decode(BLECharacteristic *pChar) {
|
||||
resp_size = set_response(&response, (const message *)data_dec);
|
||||
break;
|
||||
default:
|
||||
Serial.println(F("*********************************** unknown message"));
|
||||
Serial.println(F("*********************************** unknown message (try ACK)"));
|
||||
resp_size = set_response(&response, (const message *)data_dec);
|
||||
break;
|
||||
}
|
||||
if (resp_size) {
|
||||
pChar->setValue((uint8_t *)&response, resp_size);
|
||||
@@ -291,7 +307,7 @@ class coverCallbacks : public BLECharacteristicCallbacks {
|
||||
void onNotify(BLECharacteristic *pCharacteristic) {
|
||||
Serial.printf("Cover onNotify() %s\n", pCharacteristic->toString().c_str());
|
||||
}
|
||||
|
||||
|
||||
void onStatus(BLECharacteristic *pCharacteristic, Status s, uint32_t code) {
|
||||
Serial.printf("Cover onStatus() %s: %s\n", BLEstate[s], pCharacteristic->toString().c_str());
|
||||
}
|
||||
@@ -341,6 +357,7 @@ class genericCallbacks : public BLECharacteristicCallbacks {
|
||||
|
||||
void setup() {
|
||||
Serial.begin(115200);
|
||||
delay(1000); // wait for terminal to be ready
|
||||
Serial.println(NAME " initializing ...");
|
||||
|
||||
BLEDevice::init(NAME);
|
||||
@@ -373,8 +390,7 @@ pDesc1->setValue("cover");*/
|
||||
BAT_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_bat->setCallbacks(new batteryCallbacks());
|
||||
uint8_t battery_level = 42;
|
||||
pCharacteristic_bat->setValue(&battery_level, 1);
|
||||
pCharacteristic_bat->setValue((uint8_t *)&BATTERY_LEVEL, 1);
|
||||
pCharacteristic_bat->addDescriptor(new BLE2902());
|
||||
|
||||
BLEService *pFWService = pServer->createService(FW_SERVICE_UUID);
|
||||
@@ -390,9 +406,9 @@ pDesc1->setValue("cover");*/
|
||||
|
||||
BLEService *pDEVService = pServer->createService(DEV_SERVICE_UUID);
|
||||
pCharacteristic_dev = pDEVService->createCharacteristic(
|
||||
SWC_CHAR_UUID,
|
||||
SWR_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_dev->setValue(FW_VERSION);
|
||||
pCharacteristic_dev->setValue(String(SW_VERSION));
|
||||
pCharacteristic_dev->setCallbacks(new genericCallbacks());
|
||||
pCharacteristic_ser = pDEVService->createCharacteristic(
|
||||
SER_CHAR_UUID,
|
||||
@@ -400,6 +416,28 @@ pDesc1->setValue("cover");*/
|
||||
pCharacteristic_ser->setValue(SERIAL_NR);
|
||||
pCharacteristic_ser->setCallbacks(new genericCallbacks());
|
||||
|
||||
pCharacteristic_man = pDEVService->createCharacteristic(
|
||||
MAN_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_man->setValue("Hunter Douglas");
|
||||
pCharacteristic_man->setCallbacks(new genericCallbacks());
|
||||
pCharacteristic_mod = pDEVService->createCharacteristic(
|
||||
MOD_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_mod->setValue(String(TYP_ID));
|
||||
pCharacteristic_mod->setCallbacks(new genericCallbacks());
|
||||
pCharacteristic_fwr = pDEVService->createCharacteristic(
|
||||
FWR_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_fwr->setValue(String(FW_REVISION));
|
||||
pCharacteristic_fwr->setCallbacks(new genericCallbacks());
|
||||
pCharacteristic_hwr = pDEVService->createCharacteristic(
|
||||
HWR_CHAR_UUID,
|
||||
BLECharacteristic::PROPERTY_READ);
|
||||
pCharacteristic_hwr->setValue(String(HW_REVISION));
|
||||
pCharacteristic_hwr->setCallbacks(new genericCallbacks());
|
||||
|
||||
|
||||
// Start the services
|
||||
pCovService->start();
|
||||
pFWService->start();
|
||||
@@ -408,9 +446,10 @@ pDesc1->setValue("cover");*/
|
||||
|
||||
// Start advertising
|
||||
BLEAdvertisementData AdvertisementData;
|
||||
const String manufacturerData = String("\x19\x08\x00\x00\x2A\x00\x00\x00\x00\x00\xA2", 11);
|
||||
// Hunter Douglas ^^--^^ ^^key^^ ^^ ID-Type
|
||||
AdvertisementData.setManufacturerData(manufacturerData);
|
||||
const char adv[] = { 0x19, 0x08, 0x00, 0x00, TYP_ID, 0x00, 0x00, 0x00, 0x00, 0x00, 0xA2 };
|
||||
// Hunter Douglas ^^ -- ^^ ^^key ^^ ^--pos1--^
|
||||
|
||||
AdvertisementData.setManufacturerData(String(adv, 11));
|
||||
AdvertisementData.setPartialServices(BLEUUID(COVER_SERVICE_UUID));
|
||||
AdvertisementData.setFlags((1 << 2) | (1 << 1)); // [BR/EDR Not Supported] | [LE General Discoverable Mode]
|
||||
|
||||
|
||||
BIN
img/1200x630wa.png
Normal file
BIN
img/1200x630wa.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 40 KiB |
BIN
img/icon.png
Normal file
BIN
img/icon.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 11 KiB |
BIN
img/icon@2x.png
Normal file
BIN
img/icon@2x.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 29 KiB |
BIN
img/icon_full.png
Normal file
BIN
img/icon_full.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 31 KiB |
BIN
img/logo.png
Normal file
BIN
img/logo.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 44 KiB |
BIN
img/logo@2x.png
Normal file
BIN
img/logo@2x.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 53 KiB |
@@ -1,7 +1,17 @@
|
||||
# pyproject.toml
|
||||
|
||||
[project]
|
||||
requires-python = ">=3.12.0"
|
||||
name = "hunterdouglas_powerview_ble"
|
||||
classifiers = [
|
||||
"License :: OSI Approved :: Apache Software License",
|
||||
"Programming Language :: Python :: 3.13"
|
||||
]
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13.2"
|
||||
|
||||
[project.urls]
|
||||
"Source Code" = "https://github.com/patman15/hdpv_ble/"
|
||||
"Bug Reports" = "https://github.com/patman15/hdpv_ble/issues"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
minversion = "8.0"
|
||||
@@ -14,9 +24,9 @@ testpaths = [
|
||||
]
|
||||
asyncio_mode = "auto"
|
||||
|
||||
# ruff configuration taken from HA 2024.11.2 (less ignores)
|
||||
# ruff settings from HA 2025.2.2
|
||||
[tool.ruff]
|
||||
required-version = ">=0.6.8"
|
||||
required-version = ">=0.9.1"
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = [
|
||||
@@ -35,8 +45,10 @@ select = [
|
||||
"B017", # pytest.raises(BaseException) should be considered evil
|
||||
"B018", # Found useless attribute access. Either assign it to a variable or remove it.
|
||||
"B023", # Function definition does not bind loop variable {name}
|
||||
"B024", # `{name}` is an abstract base class, but it has no abstract methods or properties
|
||||
"B026", # Star-arg unpacking after a keyword argument is strongly discouraged
|
||||
"B032", # Possible unintentional type annotation (using :). Did you mean to assign (using =)?
|
||||
"B035", # Dictionary comprehension uses static key
|
||||
"B904", # Use raise from to specify exception cause
|
||||
"B905", # zip() without an explicit strict= parameter
|
||||
"BLE",
|
||||
@@ -70,12 +82,27 @@ select = [
|
||||
"RSE", # flake8-raise
|
||||
"RUF005", # Consider iterable unpacking instead of concatenation
|
||||
"RUF006", # Store a reference to the return value of asyncio.create_task
|
||||
"RUF007", # Prefer itertools.pairwise() over zip() when iterating over successive pairs
|
||||
"RUF008", # Do not use mutable default values for dataclass attributes
|
||||
"RUF010", # Use explicit conversion flag
|
||||
"RUF013", # PEP 484 prohibits implicit Optional
|
||||
"RUF016", # Slice in indexed access to type {value_type} uses type {index_type} instead of an integer
|
||||
"RUF017", # Avoid quadratic list summation
|
||||
"RUF018", # Avoid assignment expressions in assert statements
|
||||
"RUF019", # Unnecessary key check before dictionary access
|
||||
# "RUF100", # Unused `noqa` directive; temporarily every now and then to clean them up
|
||||
"RUF020", # {never_like} | T is equivalent to T
|
||||
"RUF021", # Parenthesize a and b expressions when chaining and and or together, to make the precedence clear
|
||||
"RUF022", # Sort __all__
|
||||
"RUF023", # Sort __slots__
|
||||
"RUF024", # Do not pass mutable objects as values to dict.fromkeys
|
||||
"RUF026", # default_factory is a positional-only argument to defaultdict
|
||||
"RUF030", # print() call in assert statement is likely unintentional
|
||||
"RUF032", # Decimal() called with float literal argument
|
||||
"RUF033", # __post_init__ method with argument defaults
|
||||
"RUF034", # Useless if-else condition
|
||||
"RUF100", # Unused `noqa` directive
|
||||
"RUF101", # noqa directives that use redirected rule codes
|
||||
"RUF200", # Failed to parse pyproject.toml: {message}
|
||||
"S102", # Use of exec detected
|
||||
"S103", # bad-file-permissions
|
||||
"S108", # hardcoded-temp-file
|
||||
@@ -88,7 +115,7 @@ select = [
|
||||
"S317", # suspicious-xml-sax-usage
|
||||
"S318", # suspicious-xml-mini-dom-usage
|
||||
"S319", # suspicious-xml-pull-dom-usage
|
||||
"S320", # suspicious-xmle-tree-usage
|
||||
# "S320", # suspicious-xmle-tree-usage
|
||||
"S601", # paramiko-call
|
||||
"S602", # subprocess-popen-with-shell-equals-true
|
||||
"S604", # call-with-shell-equals-true
|
||||
@@ -99,7 +126,7 @@ select = [
|
||||
"SLOT", # flake8-slots
|
||||
"T100", # Trace found: {name} used
|
||||
"T20", # flake8-print
|
||||
"TCH", # flake8-type-checking
|
||||
"TC", # flake8-type-checking
|
||||
"TID", # Tidy imports
|
||||
"TRY", # tryceratops
|
||||
"UP", # pyupgrade
|
||||
@@ -119,13 +146,12 @@ ignore = [
|
||||
# "PLC1901", # {existing} can be simplified to {replacement} as an empty string is falsey; too many false positives
|
||||
# "PLR0911", # Too many return statements ({returns} > {max_returns})
|
||||
# "PLR0912", # Too many branches ({branches} > {max_branches})
|
||||
# "PLR0913", # Too many arguments to function call ({c_args} > {max_args})
|
||||
"PLR0913", # Too many arguments to function call ({c_args} > {max_args})
|
||||
# "PLR0915", # Too many statements ({statements} > {max_statements})
|
||||
"PLR2004", # Magic value used in comparison, consider replacing {value} with a constant variable
|
||||
# "PLW2901", # Outer {outer_kind} variable {name} overwritten by inner {inner_kind} target
|
||||
# "PT004", # Fixture {fixture} does not return anything, add leading underscore
|
||||
# "PT011", # pytest.raises({exception}) is too broad, set the `match` parameter or use a more specific exception
|
||||
# "PT018", # Assertion should be broken down into multiple parts
|
||||
"PT018", # Assertion should be broken down into multiple parts
|
||||
# "RUF001", # String contains ambiguous unicode character.
|
||||
# "RUF002", # Docstring contains ambiguous unicode character.
|
||||
# "RUF003", # Comment contains ambiguous unicode character.
|
||||
@@ -136,14 +162,12 @@ ignore = [
|
||||
# "SIM115", # Use context handler for opening files
|
||||
|
||||
# Moving imports into type-checking blocks can mess with pytest.patch()
|
||||
"TCH001", # Move application import {} into a type-checking block
|
||||
"TCH002", # Move third-party import {} into a type-checking block
|
||||
"TCH003", # Move standard library import {} into a type-checking block
|
||||
"TC001", # Move application import {} into a type-checking block
|
||||
"TC002", # Move third-party import {} into a type-checking block
|
||||
"TC003", # Move standard library import {} into a type-checking block
|
||||
|
||||
"TRY003", # Avoid specifying long messages outside the exception class
|
||||
"TRY400", # Use `logging.exception` instead of `logging.error`
|
||||
# Ignored due to performance: https://github.com/charliermarsh/ruff/issues/2923
|
||||
"UP038", # Use `X | Y` in `isinstance` call instead of `(X, Y)`
|
||||
|
||||
# May conflict with the formatter, https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules
|
||||
"W191",
|
||||
@@ -161,3 +185,14 @@ ignore = [
|
||||
"PLE0605"
|
||||
]
|
||||
|
||||
|
||||
[tool.ruff.lint.isort]
|
||||
force-sort-within-sections = true
|
||||
known-first-party = [
|
||||
"homeassistant",
|
||||
]
|
||||
combine-as-imports = true
|
||||
split-on-trailing-comma = false
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"scripts/*" = ["T201"]
|
||||
@@ -1,4 +1,6 @@
|
||||
homeassistant==2024.11.0
|
||||
homeassistant==2025.11.0
|
||||
pip>=21.3.1
|
||||
ruff==0.6.8
|
||||
|
||||
ruff>=0.9.1,<=0.15.0
|
||||
types-requests
|
||||
mypy~=1.19.1
|
||||
codespell
|
||||
|
||||
@@ -2,9 +2,9 @@
|
||||
|
||||
wheel
|
||||
home-assistant-bluetooth
|
||||
habluetooth>=3.6.0
|
||||
habluetooth>=5.3.0
|
||||
bluetooth-adapters
|
||||
pytest>=8.3.3
|
||||
pytest>=8.4.1
|
||||
pytest-cov>=5.0.0
|
||||
pytest-socket>=0.7.0
|
||||
pytest-asyncio>=0.24.0
|
||||
@@ -16,10 +16,10 @@ aiohttp
|
||||
aiohttp_cors
|
||||
aiohttp-fast-url-dispatcher
|
||||
aiohttp-zlib-ng
|
||||
bleak>=0.22.3
|
||||
bleak-retry-connector>=3.6.0
|
||||
bleak>=1.0.1
|
||||
bleak-retry-connector>=4.4.3
|
||||
bluetooth-data-tools
|
||||
pyserial-asyncio
|
||||
pyudev
|
||||
pytest-homeassistant-custom-component==0.13.181
|
||||
pytest-homeassistant-custom-component==0.13.294
|
||||
|
||||
|
||||
1
scripts/__init__.py
Normal file
1
scripts/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Script to extract PowerView homekey from a G3 PowerView Gateway."""
|
||||
102
scripts/extract_gateway3_homekey.py
Normal file
102
scripts/extract_gateway3_homekey.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""Extract PowerView homekey from a G3 PowerView Gateway."""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import struct
|
||||
from typing import Any, Final
|
||||
|
||||
import requests
|
||||
|
||||
HUB: Final[str] = "http://powerview-g3.local"
|
||||
TIMEOUT: Final[int] = 10
|
||||
|
||||
|
||||
def create_request(sid: int, cid: int, sequence_id: int, data: bytes) -> bytes:
|
||||
"""Assemble a request frame for the PowerView protocol."""
|
||||
return struct.pack("<BBBB", sid, cid, sequence_id, len(data)) + data
|
||||
|
||||
|
||||
def decode_response(packet: bytes) -> dict[str, Any]:
|
||||
"""Decode a response frame from the PowerView protocol."""
|
||||
if len(packet) < 4:
|
||||
raise ValueError("Packet size too small")
|
||||
sid, cid, sequence_id, length = struct.unpack("<BBBB", packet[0:4])
|
||||
if len(packet) != 4 + length:
|
||||
raise ValueError("Not all data present")
|
||||
if length < 1:
|
||||
raise ValueError("No errorCode present")
|
||||
(error_code,) = struct.unpack("<B", packet[4:5])
|
||||
data: Final[bytes] = packet[5:]
|
||||
return {
|
||||
"cid": cid,
|
||||
"sid": sid,
|
||||
"sequenceId": sequence_id,
|
||||
"errorCode": error_code,
|
||||
"data": data,
|
||||
}
|
||||
|
||||
|
||||
def create_get_shade_key_request(sequence_id) -> bytes:
|
||||
"""Create a GetShadeKey request frame."""
|
||||
return create_request(251, 18, sequence_id, b"")
|
||||
|
||||
|
||||
def get_shade_key(hub: str, ble_name) -> bytes:
|
||||
"""Get the homekey for a shade."""
|
||||
try:
|
||||
shades_exec_resp: requests.Response = requests.post(
|
||||
hub + "/home/shades/exec?shades=" + ble_name,
|
||||
json={"hex": create_get_shade_key_request(1).hex()},
|
||||
timeout=TIMEOUT,
|
||||
)
|
||||
shades_exec_resp.raise_for_status()
|
||||
except requests.exceptions.RequestException as ex:
|
||||
print(f"Unable to send GetShadeKey {ex!s}")
|
||||
raise
|
||||
|
||||
result: dict = json.loads(shades_exec_resp.content)
|
||||
if result.get("err") != 0 or len(result.get("responses", [])) != 1:
|
||||
raise OSError("Error when attempting GetShadeKey")
|
||||
response: Final[bytes] = bytes.fromhex(result["responses"][0]["hex"])
|
||||
dec_resp: Final[dict[str, Any]] = decode_response(response)
|
||||
if dec_resp["errorCode"] != 0:
|
||||
raise ValueError("BLE errorCode is not 0")
|
||||
if len(dec_resp["data"]) != 16:
|
||||
raise ValueError("Expected 16 byte homekey")
|
||||
return dec_resp["data"]
|
||||
|
||||
|
||||
def main(hub: str) -> int:
|
||||
"""Extract the homekeys from all shades."""
|
||||
try:
|
||||
shades_resp: requests.Response = requests.get(
|
||||
hub + "/home/shades", timeout=TIMEOUT
|
||||
)
|
||||
shades_resp.raise_for_status()
|
||||
except requests.exceptions.RequestException as ex:
|
||||
print(f"Unable to get list of shades:\n\t{ex!s}")
|
||||
return -1
|
||||
|
||||
shades = json.loads(shades_resp.content)
|
||||
print(f"Found {len(shades)} shades, interrogating")
|
||||
for shade in shades:
|
||||
name: str = base64.b64decode(shade["name"]).decode("utf-8")
|
||||
key: bytes = get_shade_key(hub, shade["bleName"])
|
||||
|
||||
print(f"Shade '{name}':")
|
||||
print(f"\tBLE name: '{shade['bleName']}'")
|
||||
print(f"\tHomeKey: {key.hex()}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Extract PowerView homekey from a G3 PowerView Gateway"
|
||||
)
|
||||
parser.add_argument("hub", nargs="?", help="URL to HUB", default=HUB)
|
||||
args = parser.parse_args()
|
||||
sys.exit(main(**vars(args)))
|
||||
Reference in New Issue
Block a user