Compare commits
11 Commits
hub-archit
...
feature/ty
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
49bb1e79c9 | ||
|
|
d73bb1d1c4 | ||
|
|
e00e04159a | ||
|
|
ea33e08da0 | ||
|
|
8c5c3d3c9a | ||
|
|
0299a4ed24 | ||
|
|
21d31ba38a | ||
|
|
b6b7e43b02 | ||
|
|
b550f98e2b | ||
|
|
f6460280db | ||
|
|
65b7b3814b |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -152,3 +152,6 @@ cython_debug/
|
||||
#.idea/
|
||||
aes.py
|
||||
*.bak
|
||||
emu/PV_BLE_cover/PV_BLE_cover.ino
|
||||
img/
|
||||
*.zip
|
||||
|
||||
@@ -12,7 +12,14 @@ from bleak.exc import BleakError
|
||||
from bleak.uuids import normalize_uuid_str
|
||||
from bleak_retry_connector import establish_connection
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||
from homeassistant.components.cover import ATTR_CURRENT_POSITION
|
||||
from cryptography.hazmat.primitives.ciphers.base import (
|
||||
AEADDecryptionContext,
|
||||
AEADEncryptionContext,
|
||||
)
|
||||
from homeassistant.components.cover import (
|
||||
ATTR_CURRENT_POSITION,
|
||||
ATTR_CURRENT_TILT_POSITION,
|
||||
)
|
||||
|
||||
from .const import LOGGER, TIMEOUT
|
||||
|
||||
@@ -25,6 +32,7 @@ ATTR_ACTIVITY: Final[str] = "activity"
|
||||
|
||||
|
||||
SHADE_TYPE: Final[dict[int, str]] = {
|
||||
# up down only
|
||||
1: "Designer Roller",
|
||||
4: "Roman",
|
||||
5: "Bottom Up",
|
||||
@@ -38,6 +46,14 @@ SHADE_TYPE: Final[dict[int, str]] = {
|
||||
52: "Banded Shades",
|
||||
53: "Sonnette",
|
||||
84: "Vignette",
|
||||
# top down bottom up
|
||||
8: "Duette, Top Down Bottom Up",
|
||||
9: "Duette DuoLite, Top Down Bottom Up",
|
||||
33: "Duette Architella, Top Down Bottom Up",
|
||||
47: "Pleated, Top Down Bottom Up",
|
||||
# top down, tilt anywhere
|
||||
51: "Venetian, Tilt Anywhere",
|
||||
62: "Venetian, Tilt Anywhere",
|
||||
}
|
||||
|
||||
OPEN_POSITION: Final[int] = 100
|
||||
@@ -58,6 +74,7 @@ class ShadeCmd(Enum):
|
||||
SET_POSITION = 0x01F7
|
||||
STOP = 0xB8F7
|
||||
ACTIVATE_SCENE = 0xBAF7
|
||||
IDENTIFY = 0x11F7
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -91,12 +108,12 @@ class PowerViewBLE:
|
||||
],
|
||||
)
|
||||
self._data_event = asyncio.Event()
|
||||
self._data: bytearray
|
||||
self._data: bytearray = bytearray()
|
||||
self._info: PVDeviceInfo = PVDeviceInfo()
|
||||
self._cmd_lock: Final = asyncio.Lock()
|
||||
self._cmd_next = None
|
||||
self._cmd_next: tuple[ShadeCmd, bytes]
|
||||
self._is_encrypted: bool = False
|
||||
self._cipher: Final = (
|
||||
self._cipher: Final[Cipher | None] = (
|
||||
Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16)))
|
||||
if len(home_key) == 16
|
||||
else None
|
||||
@@ -112,7 +129,7 @@ class PowerViewBLE:
|
||||
return self._is_encrypted
|
||||
|
||||
@encrypted.setter
|
||||
def encrypted(self, value:bool) -> None:
|
||||
def encrypted(self, value: bool) -> None:
|
||||
self._is_encrypted = value
|
||||
|
||||
@property
|
||||
@@ -126,9 +143,7 @@ class PowerViewBLE:
|
||||
return self._client.is_connected
|
||||
|
||||
# general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len
|
||||
async def _cmd(
|
||||
self, cmd: tuple[ShadeCmd, bytearray], disconnect: bool = True
|
||||
) -> None:
|
||||
async def _cmd(self, cmd: tuple[ShadeCmd, bytes], disconnect: bool = True) -> None:
|
||||
self._cmd_next = cmd
|
||||
if self._cmd_lock.locked():
|
||||
LOGGER.debug("%s: device busy, queuing %s command", self.name, cmd[0])
|
||||
@@ -137,19 +152,18 @@ class PowerViewBLE:
|
||||
async with self._cmd_lock:
|
||||
try:
|
||||
await self._connect()
|
||||
cmd_run = self._cmd_next
|
||||
tx_data = (
|
||||
bytearray(
|
||||
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
|
||||
+ bytes([self._seqcnt, len(cmd_run[1])])
|
||||
)
|
||||
cmd_run: tuple[ShadeCmd, bytes] = self._cmd_next
|
||||
tx_data: bytes = bytes(
|
||||
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
|
||||
+ bytes([self._seqcnt, len(cmd_run[1])])
|
||||
+ cmd_run[1]
|
||||
)
|
||||
LOGGER.debug("sending cmd: %s", tx_data.hex(" "))
|
||||
if self._cipher is not None and self._is_encrypted:
|
||||
enc = self._cipher.encryptor()
|
||||
enc: AEADEncryptionContext = self._cipher.encryptor()
|
||||
tx_data = enc.update(tx_data) + enc.finalize()
|
||||
LOGGER.debug(" encrypted: %s", tx_data.hex(" "))
|
||||
self._data_event.clear()
|
||||
LOGGER.debug("sending cmd: %s", tx_data)
|
||||
await self._client.write_gatt_char(UUID_TX, tx_data, False)
|
||||
self._seqcnt += 1
|
||||
LOGGER.debug("waiting for response")
|
||||
@@ -171,13 +185,13 @@ class PowerViewBLE:
|
||||
if len(data) != 9:
|
||||
LOGGER.debug("not a V2 record!")
|
||||
return []
|
||||
pos = int.from_bytes(data[3:5], byteorder="little")
|
||||
pos2 = (int(data[5]) << 4) + (int(data[4]) >> 4)
|
||||
pos: Final[int] = int.from_bytes(data[3:5], byteorder="little")
|
||||
pos2: Final[int] = (int(data[5]) << 4) + (int(data[4]) >> 4)
|
||||
return [
|
||||
(ATTR_CURRENT_POSITION, ((pos >> 2) / 10)),
|
||||
("position2", pos2 >> 2),
|
||||
("position3", int(data[6])),
|
||||
("tilt", int(data[7])),
|
||||
(ATTR_CURRENT_TILT_POSITION, int(data[7])),
|
||||
("home_id", int.from_bytes(data[0:2], byteorder="little")),
|
||||
("type_id", int.from_bytes(data[2:3])),
|
||||
("is_opening", bool(pos & 0x3 == 0x2)),
|
||||
@@ -189,16 +203,31 @@ class PowerViewBLE:
|
||||
]
|
||||
|
||||
# position cmd: uint16_t pos1, uint16_t pos2, uint16_t pos3, uint16_t tilt, uint8_t velocity
|
||||
async def set_position(self, value: int, disconnect: bool = True) -> None:
|
||||
async def set_position(
|
||||
self,
|
||||
pos1: int,
|
||||
pos2: int | None = None,
|
||||
pos3: int | None = None,
|
||||
tilt: int | None = None,
|
||||
velocity: int = 0x0,
|
||||
disconnect: bool = True,
|
||||
) -> None:
|
||||
"""Set position of device."""
|
||||
LOGGER.debug("%s setting position to %i", self.name, value)
|
||||
LOGGER.debug("%s setting position to %i, tilt %i", self.name, pos1, tilt)
|
||||
await self._cmd(
|
||||
(
|
||||
ShadeCmd.SET_POSITION,
|
||||
bytearray(
|
||||
int.to_bytes(value * 100, 2, byteorder="little")
|
||||
+ bytes([0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x0])
|
||||
),
|
||||
int.to_bytes(pos1, 2, byteorder="little")
|
||||
+ int.to_bytes(
|
||||
pos2 if pos2 is not None else 0x8000, 2, byteorder="little"
|
||||
)
|
||||
+ int.to_bytes(
|
||||
pos3 if pos3 is not None else 0x8000, 2, byteorder="little"
|
||||
)
|
||||
+ int.to_bytes(
|
||||
tilt if tilt is not None else 0x8000, 2, byteorder="little"
|
||||
)
|
||||
+ int.to_bytes(velocity, 1),
|
||||
),
|
||||
disconnect,
|
||||
)
|
||||
@@ -211,7 +240,7 @@ class PowerViewBLE:
|
||||
async def stop(self) -> None:
|
||||
"""Stop device movement."""
|
||||
LOGGER.debug("%s stop", self.name)
|
||||
await self._cmd((ShadeCmd.STOP, bytearray(b"")))
|
||||
await self._cmd((ShadeCmd.STOP, b""))
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Fully close cover."""
|
||||
@@ -227,7 +256,7 @@ class PowerViewBLE:
|
||||
await self._cmd(
|
||||
(
|
||||
ShadeCmd.ACTIVATE_SCENE,
|
||||
bytearray(int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2])),
|
||||
int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2]),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -235,7 +264,7 @@ class PowerViewBLE:
|
||||
"""Verify shade response data."""
|
||||
data: bytearray = din
|
||||
if self._cipher is not None and self._is_encrypted:
|
||||
dec = self._cipher.decryptor()
|
||||
dec: AEADDecryptionContext = self._cipher.decryptor()
|
||||
data = bytearray(dec.update(din) + dec.finalize())
|
||||
if len(data) < 4:
|
||||
LOGGER.error("Reponse message too short")
|
||||
@@ -252,7 +281,7 @@ class PowerViewBLE:
|
||||
LOGGER.error("Wrong response data length")
|
||||
return False
|
||||
if int(data[4] != 0):
|
||||
LOGGER.error("Command %d returned error #%d", cmd.value, int(data[4]))
|
||||
LOGGER.error("Command %X returned error #%d", cmd.value, int(data[4]))
|
||||
return False
|
||||
return True
|
||||
|
||||
@@ -279,6 +308,9 @@ class PowerViewBLE:
|
||||
.copy()
|
||||
.decode("UTF-8")
|
||||
)
|
||||
except BleakError as ex:
|
||||
LOGGER.debug("%s: querying failed: %s", self.name, ex)
|
||||
raise
|
||||
finally:
|
||||
await self.disconnect()
|
||||
LOGGER.debug("%s device data: %s", self.name, data)
|
||||
@@ -290,8 +322,17 @@ class PowerViewBLE:
|
||||
LOGGER.debug("Disconnected from %s", client.address)
|
||||
|
||||
def _notification_handler(self, _sender, data: bytearray) -> None:
|
||||
LOGGER.debug("%s received BLE data: %s", self.name, data)
|
||||
LOGGER.debug("%s received BLE data: %s", self.name, data.hex(" "))
|
||||
self._data = data
|
||||
if self._cipher is not None and self._is_encrypted:
|
||||
dec: AEADDecryptionContext = self._cipher.decryptor()
|
||||
self._data = bytearray(dec.update(data) + dec.finalize())
|
||||
LOGGER.debug(
|
||||
"%s %s",
|
||||
"decoded data: ".rjust(19 + len(self.name)),
|
||||
self._data.hex(" "),
|
||||
)
|
||||
|
||||
self._data_event.set()
|
||||
|
||||
async def _connect(self) -> None:
|
||||
@@ -303,7 +344,7 @@ class PowerViewBLE:
|
||||
LOGGER.debug("%s already connected", self.name)
|
||||
return
|
||||
|
||||
start = time.time()
|
||||
start: float = time.time()
|
||||
self._client = await establish_connection(
|
||||
BleakClient,
|
||||
self._ble_device,
|
||||
@@ -311,7 +352,7 @@ class PowerViewBLE:
|
||||
disconnected_callback=self._on_disconnect,
|
||||
services=[
|
||||
UUID_COV_SERVICE,
|
||||
# self.UUID_DEV_SERVICE,
|
||||
UUID_DEV_SERVICE,
|
||||
# self.UUID_BAT_SERVICE,
|
||||
],
|
||||
)
|
||||
|
||||
@@ -8,7 +8,9 @@ from homeassistant.components.bluetooth.passive_update_coordinator import (
|
||||
)
|
||||
from homeassistant.components.cover import (
|
||||
ATTR_CURRENT_POSITION,
|
||||
ATTR_CURRENT_TILT_POSITION,
|
||||
ATTR_POSITION,
|
||||
ATTR_TILT_POSITION,
|
||||
CoverDeviceClass,
|
||||
CoverEntity,
|
||||
CoverEntityFeature,
|
||||
@@ -31,11 +33,15 @@ async def async_setup_entry(
|
||||
"""Set up the demo cover platform."""
|
||||
|
||||
coordinator: PVCoordinator = config_entry.runtime_data
|
||||
async_add_entities([PowerViewCover(coordinator)])
|
||||
async_add_entities(
|
||||
[PowerViewCoverTilt(coordinator)]
|
||||
if coordinator.dev_details.get("model") in ["51", "62"]
|
||||
else [PowerViewCover(coordinator)]
|
||||
)
|
||||
|
||||
|
||||
class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEntity): # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Representation of a powerview shade."""
|
||||
"""Representation of a PowerView shade with Up/Down functionality only."""
|
||||
|
||||
_attr_has_entity_name = True
|
||||
_attr_device_class = CoverDeviceClass.SHADE
|
||||
@@ -51,8 +57,9 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
coordinator: PVCoordinator,
|
||||
) -> None:
|
||||
"""Initialize the shade."""
|
||||
LOGGER.debug("%s: init() PowerViewCover", coordinator.name)
|
||||
self._attr_name = CoverDeviceClass.SHADE
|
||||
self._coord = coordinator
|
||||
self._coord: PVCoordinator = coordinator
|
||||
self._attr_device_info = self._coord.device_info
|
||||
self._target_position: int | None = round(
|
||||
self._coord.data.get(ATTR_CURRENT_POSITION, OPEN_POSITION)
|
||||
@@ -170,3 +177,74 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error("Failed to stop cover '%s': %s", self.name, err)
|
||||
|
||||
|
||||
class PowerViewCoverTilt(PowerViewCover):
|
||||
"""Representation of a PowerView shade with additional tilt functionality."""
|
||||
|
||||
_attr_supported_features = (
|
||||
CoverEntityFeature.OPEN
|
||||
| CoverEntityFeature.CLOSE
|
||||
| CoverEntityFeature.STOP
|
||||
| CoverEntityFeature.SET_POSITION
|
||||
| CoverEntityFeature.OPEN_TILT
|
||||
| CoverEntityFeature.CLOSE_TILT
|
||||
| CoverEntityFeature.STOP_TILT
|
||||
| CoverEntityFeature.SET_TILT_POSITION
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: PVCoordinator,
|
||||
) -> None:
|
||||
LOGGER.debug("%s: init() PowerViewCoverTilt", coordinator.name)
|
||||
super().__init__(coordinator)
|
||||
|
||||
@property
|
||||
def current_cover_tilt_position(self) -> int | None: # type: ignore[reportIncompatibleVariableOverride]
|
||||
"""Return current tilt of cover.
|
||||
|
||||
None is unknown
|
||||
"""
|
||||
pos: Final = self._coord.data.get(ATTR_CURRENT_TILT_POSITION)
|
||||
return round(pos) if pos is not None else None
|
||||
|
||||
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
|
||||
"""Move the tilt to a specific position."""
|
||||
|
||||
if isinstance(target_position := kwargs.get(ATTR_TILT_POSITION), int):
|
||||
LOGGER.debug("set cover tilt to position %i", target_position)
|
||||
if (
|
||||
self.current_cover_tilt_position == round(target_position)
|
||||
or self.current_cover_position is None
|
||||
):
|
||||
return
|
||||
|
||||
try:
|
||||
await self._coord.api.set_position(
|
||||
self.current_cover_position, tilt=target_position
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
except BleakError as err:
|
||||
LOGGER.error(
|
||||
"Failed to tilt cover '%s' to %f%%: %s",
|
||||
self.name,
|
||||
target_position,
|
||||
err,
|
||||
)
|
||||
|
||||
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
|
||||
"""Stop the cover."""
|
||||
await self.async_stop_cover(kwargs=kwargs)
|
||||
|
||||
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
|
||||
"""Open the cover tilt."""
|
||||
LOGGER.debug("open cover tilt")
|
||||
_kwargs = {**kwargs, ATTR_TILT_POSITION: OPEN_POSITION}
|
||||
await self.async_set_cover_tilt_position(**_kwargs)
|
||||
|
||||
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
|
||||
"""Close the cover tilt."""
|
||||
LOGGER.debug("close cover tilt")
|
||||
_kwargs = {**kwargs, ATTR_TILT_POSITION: CLOSED_POSITION}
|
||||
await self.async_set_cover_tilt_position(**_kwargs)
|
||||
|
||||
@@ -4,8 +4,7 @@
|
||||
"bluetooth": [
|
||||
{
|
||||
"service_uuid": "0000fdc1-0000-1000-8000-00805f9b34fb",
|
||||
"manufacturer_id": 2073,
|
||||
"manufacturer_data_start": [0,0]
|
||||
"manufacturer_id": 2073
|
||||
}
|
||||
],
|
||||
"codeowners": ["@patman15"],
|
||||
|
||||
Reference in New Issue
Block a user