11 Commits

Author SHA1 Message Date
patman15
49bb1e79c9 add tilt functions 2025-01-07 09:50:41 +01:00
patman15
d73bb1d1c4 completed tilt control 2025-01-06 19:46:52 +01:00
patman15
e00e04159a support setting tilt 2025-01-05 19:31:26 +01:00
patman15
ea33e08da0 extend set_position() for all values 2025-01-05 12:12:16 +01:00
patman15
8c5c3d3c9a Update .gitignore 2024-12-29 13:00:15 +01:00
patman15
0299a4ed24 Merge branch 'main' into feature/type_8 2024-12-29 12:49:44 +01:00
patman15
21d31ba38a Merge branch 'main' into feature/type_8 2024-12-22 13:36:41 +01:00
patman15
b6b7e43b02 stronger typing 2024-12-22 13:35:29 +01:00
patman15
b550f98e2b Merge branch 'main' into feature/type_8 2024-12-21 09:10:50 +01:00
patman15
f6460280db Merge branch 'main' into feature/type_8 2024-12-20 21:16:37 +01:00
patman15
65b7b3814b disabled home_id filter 2024-12-20 20:41:01 +01:00
4 changed files with 158 additions and 37 deletions

3
.gitignore vendored
View File

@@ -152,3 +152,6 @@ cython_debug/
#.idea/
aes.py
*.bak
emu/PV_BLE_cover/PV_BLE_cover.ino
img/
*.zip

View File

@@ -12,7 +12,14 @@ from bleak.exc import BleakError
from bleak.uuids import normalize_uuid_str
from bleak_retry_connector import establish_connection
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from homeassistant.components.cover import ATTR_CURRENT_POSITION
from cryptography.hazmat.primitives.ciphers.base import (
AEADDecryptionContext,
AEADEncryptionContext,
)
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
ATTR_CURRENT_TILT_POSITION,
)
from .const import LOGGER, TIMEOUT
@@ -25,6 +32,7 @@ ATTR_ACTIVITY: Final[str] = "activity"
SHADE_TYPE: Final[dict[int, str]] = {
# up down only
1: "Designer Roller",
4: "Roman",
5: "Bottom Up",
@@ -38,6 +46,14 @@ SHADE_TYPE: Final[dict[int, str]] = {
52: "Banded Shades",
53: "Sonnette",
84: "Vignette",
# top down bottom up
8: "Duette, Top Down Bottom Up",
9: "Duette DuoLite, Top Down Bottom Up",
33: "Duette Architella, Top Down Bottom Up",
47: "Pleated, Top Down Bottom Up",
# top down, tilt anywhere
51: "Venetian, Tilt Anywhere",
62: "Venetian, Tilt Anywhere",
}
OPEN_POSITION: Final[int] = 100
@@ -58,6 +74,7 @@ class ShadeCmd(Enum):
SET_POSITION = 0x01F7
STOP = 0xB8F7
ACTIVATE_SCENE = 0xBAF7
IDENTIFY = 0x11F7
@dataclass
@@ -91,12 +108,12 @@ class PowerViewBLE:
],
)
self._data_event = asyncio.Event()
self._data: bytearray
self._data: bytearray = bytearray()
self._info: PVDeviceInfo = PVDeviceInfo()
self._cmd_lock: Final = asyncio.Lock()
self._cmd_next = None
self._cmd_next: tuple[ShadeCmd, bytes]
self._is_encrypted: bool = False
self._cipher: Final = (
self._cipher: Final[Cipher | None] = (
Cipher(algorithms.AES(home_key), modes.CTR(bytearray(16)))
if len(home_key) == 16
else None
@@ -112,7 +129,7 @@ class PowerViewBLE:
return self._is_encrypted
@encrypted.setter
def encrypted(self, value:bool) -> None:
def encrypted(self, value: bool) -> None:
self._is_encrypted = value
@property
@@ -126,9 +143,7 @@ class PowerViewBLE:
return self._client.is_connected
# general cmd: uint16_t cmd, uint8_t seqID, uint8_t data_len
async def _cmd(
self, cmd: tuple[ShadeCmd, bytearray], disconnect: bool = True
) -> None:
async def _cmd(self, cmd: tuple[ShadeCmd, bytes], disconnect: bool = True) -> None:
self._cmd_next = cmd
if self._cmd_lock.locked():
LOGGER.debug("%s: device busy, queuing %s command", self.name, cmd[0])
@@ -137,19 +152,18 @@ class PowerViewBLE:
async with self._cmd_lock:
try:
await self._connect()
cmd_run = self._cmd_next
tx_data = (
bytearray(
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
+ bytes([self._seqcnt, len(cmd_run[1])])
)
cmd_run: tuple[ShadeCmd, bytes] = self._cmd_next
tx_data: bytes = bytes(
int.to_bytes(cmd_run[0].value, 2, byteorder="little")
+ bytes([self._seqcnt, len(cmd_run[1])])
+ cmd_run[1]
)
LOGGER.debug("sending cmd: %s", tx_data.hex(" "))
if self._cipher is not None and self._is_encrypted:
enc = self._cipher.encryptor()
enc: AEADEncryptionContext = self._cipher.encryptor()
tx_data = enc.update(tx_data) + enc.finalize()
LOGGER.debug(" encrypted: %s", tx_data.hex(" "))
self._data_event.clear()
LOGGER.debug("sending cmd: %s", tx_data)
await self._client.write_gatt_char(UUID_TX, tx_data, False)
self._seqcnt += 1
LOGGER.debug("waiting for response")
@@ -171,13 +185,13 @@ class PowerViewBLE:
if len(data) != 9:
LOGGER.debug("not a V2 record!")
return []
pos = int.from_bytes(data[3:5], byteorder="little")
pos2 = (int(data[5]) << 4) + (int(data[4]) >> 4)
pos: Final[int] = int.from_bytes(data[3:5], byteorder="little")
pos2: Final[int] = (int(data[5]) << 4) + (int(data[4]) >> 4)
return [
(ATTR_CURRENT_POSITION, ((pos >> 2) / 10)),
("position2", pos2 >> 2),
("position3", int(data[6])),
("tilt", int(data[7])),
(ATTR_CURRENT_TILT_POSITION, int(data[7])),
("home_id", int.from_bytes(data[0:2], byteorder="little")),
("type_id", int.from_bytes(data[2:3])),
("is_opening", bool(pos & 0x3 == 0x2)),
@@ -189,16 +203,31 @@ class PowerViewBLE:
]
# position cmd: uint16_t pos1, uint16_t pos2, uint16_t pos3, uint16_t tilt, uint8_t velocity
async def set_position(self, value: int, disconnect: bool = True) -> None:
async def set_position(
self,
pos1: int,
pos2: int | None = None,
pos3: int | None = None,
tilt: int | None = None,
velocity: int = 0x0,
disconnect: bool = True,
) -> None:
"""Set position of device."""
LOGGER.debug("%s setting position to %i", self.name, value)
LOGGER.debug("%s setting position to %i, tilt %i", self.name, pos1, tilt)
await self._cmd(
(
ShadeCmd.SET_POSITION,
bytearray(
int.to_bytes(value * 100, 2, byteorder="little")
+ bytes([0x00, 0x80, 0x00, 0x80, 0x00, 0x80, 0x0])
),
int.to_bytes(pos1, 2, byteorder="little")
+ int.to_bytes(
pos2 if pos2 is not None else 0x8000, 2, byteorder="little"
)
+ int.to_bytes(
pos3 if pos3 is not None else 0x8000, 2, byteorder="little"
)
+ int.to_bytes(
tilt if tilt is not None else 0x8000, 2, byteorder="little"
)
+ int.to_bytes(velocity, 1),
),
disconnect,
)
@@ -211,7 +240,7 @@ class PowerViewBLE:
async def stop(self) -> None:
"""Stop device movement."""
LOGGER.debug("%s stop", self.name)
await self._cmd((ShadeCmd.STOP, bytearray(b"")))
await self._cmd((ShadeCmd.STOP, b""))
async def close(self) -> None:
"""Fully close cover."""
@@ -227,7 +256,7 @@ class PowerViewBLE:
await self._cmd(
(
ShadeCmd.ACTIVATE_SCENE,
bytearray(int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2])),
int.to_bytes(idx, 1, byteorder="little") + bytes([0xA2]),
),
)
@@ -235,7 +264,7 @@ class PowerViewBLE:
"""Verify shade response data."""
data: bytearray = din
if self._cipher is not None and self._is_encrypted:
dec = self._cipher.decryptor()
dec: AEADDecryptionContext = self._cipher.decryptor()
data = bytearray(dec.update(din) + dec.finalize())
if len(data) < 4:
LOGGER.error("Reponse message too short")
@@ -252,7 +281,7 @@ class PowerViewBLE:
LOGGER.error("Wrong response data length")
return False
if int(data[4] != 0):
LOGGER.error("Command %d returned error #%d", cmd.value, int(data[4]))
LOGGER.error("Command %X returned error #%d", cmd.value, int(data[4]))
return False
return True
@@ -279,6 +308,9 @@ class PowerViewBLE:
.copy()
.decode("UTF-8")
)
except BleakError as ex:
LOGGER.debug("%s: querying failed: %s", self.name, ex)
raise
finally:
await self.disconnect()
LOGGER.debug("%s device data: %s", self.name, data)
@@ -290,8 +322,17 @@ class PowerViewBLE:
LOGGER.debug("Disconnected from %s", client.address)
def _notification_handler(self, _sender, data: bytearray) -> None:
LOGGER.debug("%s received BLE data: %s", self.name, data)
LOGGER.debug("%s received BLE data: %s", self.name, data.hex(" "))
self._data = data
if self._cipher is not None and self._is_encrypted:
dec: AEADDecryptionContext = self._cipher.decryptor()
self._data = bytearray(dec.update(data) + dec.finalize())
LOGGER.debug(
"%s %s",
"decoded data: ".rjust(19 + len(self.name)),
self._data.hex(" "),
)
self._data_event.set()
async def _connect(self) -> None:
@@ -303,7 +344,7 @@ class PowerViewBLE:
LOGGER.debug("%s already connected", self.name)
return
start = time.time()
start: float = time.time()
self._client = await establish_connection(
BleakClient,
self._ble_device,
@@ -311,7 +352,7 @@ class PowerViewBLE:
disconnected_callback=self._on_disconnect,
services=[
UUID_COV_SERVICE,
# self.UUID_DEV_SERVICE,
UUID_DEV_SERVICE,
# self.UUID_BAT_SERVICE,
],
)

View File

@@ -8,7 +8,9 @@ from homeassistant.components.bluetooth.passive_update_coordinator import (
)
from homeassistant.components.cover import (
ATTR_CURRENT_POSITION,
ATTR_CURRENT_TILT_POSITION,
ATTR_POSITION,
ATTR_TILT_POSITION,
CoverDeviceClass,
CoverEntity,
CoverEntityFeature,
@@ -31,11 +33,15 @@ async def async_setup_entry(
"""Set up the demo cover platform."""
coordinator: PVCoordinator = config_entry.runtime_data
async_add_entities([PowerViewCover(coordinator)])
async_add_entities(
[PowerViewCoverTilt(coordinator)]
if coordinator.dev_details.get("model") in ["51", "62"]
else [PowerViewCover(coordinator)]
)
class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEntity): # type: ignore[reportIncompatibleVariableOverride]
"""Representation of a powerview shade."""
"""Representation of a PowerView shade with Up/Down functionality only."""
_attr_has_entity_name = True
_attr_device_class = CoverDeviceClass.SHADE
@@ -51,8 +57,9 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
coordinator: PVCoordinator,
) -> None:
"""Initialize the shade."""
LOGGER.debug("%s: init() PowerViewCover", coordinator.name)
self._attr_name = CoverDeviceClass.SHADE
self._coord = coordinator
self._coord: PVCoordinator = coordinator
self._attr_device_info = self._coord.device_info
self._target_position: int | None = round(
self._coord.data.get(ATTR_CURRENT_POSITION, OPEN_POSITION)
@@ -170,3 +177,74 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
self.async_write_ha_state()
except BleakError as err:
LOGGER.error("Failed to stop cover '%s': %s", self.name, err)
class PowerViewCoverTilt(PowerViewCover):
"""Representation of a PowerView shade with additional tilt functionality."""
_attr_supported_features = (
CoverEntityFeature.OPEN
| CoverEntityFeature.CLOSE
| CoverEntityFeature.STOP
| CoverEntityFeature.SET_POSITION
| CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
| CoverEntityFeature.STOP_TILT
| CoverEntityFeature.SET_TILT_POSITION
)
def __init__(
self,
coordinator: PVCoordinator,
) -> None:
LOGGER.debug("%s: init() PowerViewCoverTilt", coordinator.name)
super().__init__(coordinator)
@property
def current_cover_tilt_position(self) -> int | None: # type: ignore[reportIncompatibleVariableOverride]
"""Return current tilt of cover.
None is unknown
"""
pos: Final = self._coord.data.get(ATTR_CURRENT_TILT_POSITION)
return round(pos) if pos is not None else None
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the tilt to a specific position."""
if isinstance(target_position := kwargs.get(ATTR_TILT_POSITION), int):
LOGGER.debug("set cover tilt to position %i", target_position)
if (
self.current_cover_tilt_position == round(target_position)
or self.current_cover_position is None
):
return
try:
await self._coord.api.set_position(
self.current_cover_position, tilt=target_position
)
self.async_write_ha_state()
except BleakError as err:
LOGGER.error(
"Failed to tilt cover '%s' to %f%%: %s",
self.name,
target_position,
err,
)
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
"""Stop the cover."""
await self.async_stop_cover(kwargs=kwargs)
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
LOGGER.debug("open cover tilt")
_kwargs = {**kwargs, ATTR_TILT_POSITION: OPEN_POSITION}
await self.async_set_cover_tilt_position(**_kwargs)
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
"""Close the cover tilt."""
LOGGER.debug("close cover tilt")
_kwargs = {**kwargs, ATTR_TILT_POSITION: CLOSED_POSITION}
await self.async_set_cover_tilt_position(**_kwargs)

View File

@@ -4,8 +4,7 @@
"bluetooth": [
{
"service_uuid": "0000fdc1-0000-1000-8000-00805f9b34fb",
"manufacturer_id": 2073,
"manufacturer_data_start": [0,0]
"manufacturer_id": 2073
}
],
"codeowners": ["@patman15"],