support setting tilt
This commit is contained in:
@@ -71,6 +71,7 @@ class ShadeCmd(Enum):
|
|||||||
SET_POSITION = 0x01F7
|
SET_POSITION = 0x01F7
|
||||||
STOP = 0xB8F7
|
STOP = 0xB8F7
|
||||||
ACTIVATE_SCENE = 0xBAF7
|
ACTIVATE_SCENE = 0xBAF7
|
||||||
|
IDENTIFY = 0x11F7
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -154,11 +155,12 @@ class PowerViewBLE:
|
|||||||
+ bytes([self._seqcnt, len(cmd_run[1])])
|
+ bytes([self._seqcnt, len(cmd_run[1])])
|
||||||
+ cmd_run[1]
|
+ cmd_run[1]
|
||||||
)
|
)
|
||||||
|
LOGGER.debug("sending cmd: %s", tx_data.hex(" "))
|
||||||
if self._cipher is not None and self._is_encrypted:
|
if self._cipher is not None and self._is_encrypted:
|
||||||
enc: AEADEncryptionContext = self._cipher.encryptor()
|
enc: AEADEncryptionContext = self._cipher.encryptor()
|
||||||
tx_data = enc.update(tx_data) + enc.finalize()
|
tx_data = enc.update(tx_data) + enc.finalize()
|
||||||
|
LOGGER.debug(" encrypted: %s", tx_data.hex(" "))
|
||||||
self._data_event.clear()
|
self._data_event.clear()
|
||||||
LOGGER.debug("sending cmd: %s", tx_data)
|
|
||||||
await self._client.write_gatt_char(UUID_TX, tx_data, False)
|
await self._client.write_gatt_char(UUID_TX, tx_data, False)
|
||||||
self._seqcnt += 1
|
self._seqcnt += 1
|
||||||
LOGGER.debug("waiting for response")
|
LOGGER.debug("waiting for response")
|
||||||
@@ -180,8 +182,8 @@ class PowerViewBLE:
|
|||||||
if len(data) != 9:
|
if len(data) != 9:
|
||||||
LOGGER.debug("not a V2 record!")
|
LOGGER.debug("not a V2 record!")
|
||||||
return []
|
return []
|
||||||
pos: int = int.from_bytes(data[3:5], byteorder="little")
|
pos: Final[int] = int.from_bytes(data[3:5], byteorder="little")
|
||||||
pos2: int = (int(data[5]) << 4) + (int(data[4]) >> 4)
|
pos2: Final[int] = (int(data[5]) << 4) + (int(data[4]) >> 4)
|
||||||
return [
|
return [
|
||||||
(ATTR_CURRENT_POSITION, ((pos >> 2) / 10)),
|
(ATTR_CURRENT_POSITION, ((pos >> 2) / 10)),
|
||||||
("position2", pos2 >> 2),
|
("position2", pos2 >> 2),
|
||||||
@@ -270,7 +272,7 @@ class PowerViewBLE:
|
|||||||
LOGGER.error("Wrong response data length")
|
LOGGER.error("Wrong response data length")
|
||||||
return False
|
return False
|
||||||
if int(data[4] != 0):
|
if int(data[4] != 0):
|
||||||
LOGGER.error("Command %d returned error #%d", cmd.value, int(data[4]))
|
LOGGER.error("Command %X returned error #%d", cmd.value, int(data[4]))
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -308,8 +310,13 @@ class PowerViewBLE:
|
|||||||
LOGGER.debug("Disconnected from %s", client.address)
|
LOGGER.debug("Disconnected from %s", client.address)
|
||||||
|
|
||||||
def _notification_handler(self, _sender, data: bytearray) -> None:
|
def _notification_handler(self, _sender, data: bytearray) -> None:
|
||||||
LOGGER.debug("%s received BLE data: %s", self.name, data)
|
LOGGER.debug("%s received BLE data: %s", self.name, data.hex(" "))
|
||||||
self._data = data
|
self._data = data
|
||||||
|
if self._cipher is not None and self._is_encrypted:
|
||||||
|
dec: AEADDecryptionContext = self._cipher.decryptor()
|
||||||
|
self._data = bytearray(dec.update(data) + dec.finalize())
|
||||||
|
LOGGER.debug("%s %s", "decoded data: ".rjust(19+len(self.name)), self._data.hex(" "))
|
||||||
|
|
||||||
self._data_event.set()
|
self._data_event.set()
|
||||||
|
|
||||||
async def _connect(self) -> None:
|
async def _connect(self) -> None:
|
||||||
@@ -321,7 +328,7 @@ class PowerViewBLE:
|
|||||||
LOGGER.debug("%s already connected", self.name)
|
LOGGER.debug("%s already connected", self.name)
|
||||||
return
|
return
|
||||||
|
|
||||||
start = time.time()
|
start: float = time.time()
|
||||||
self._client = await establish_connection(
|
self._client = await establish_connection(
|
||||||
BleakClient,
|
BleakClient,
|
||||||
self._ble_device,
|
self._ble_device,
|
||||||
|
|||||||
@@ -8,7 +8,9 @@ from homeassistant.components.bluetooth.passive_update_coordinator import (
|
|||||||
)
|
)
|
||||||
from homeassistant.components.cover import (
|
from homeassistant.components.cover import (
|
||||||
ATTR_CURRENT_POSITION,
|
ATTR_CURRENT_POSITION,
|
||||||
|
ATTR_CURRENT_TILT_POSITION,
|
||||||
ATTR_POSITION,
|
ATTR_POSITION,
|
||||||
|
ATTR_TILT_POSITION,
|
||||||
CoverDeviceClass,
|
CoverDeviceClass,
|
||||||
CoverEntity,
|
CoverEntity,
|
||||||
CoverEntityFeature,
|
CoverEntityFeature,
|
||||||
@@ -35,7 +37,7 @@ async def async_setup_entry(
|
|||||||
|
|
||||||
|
|
||||||
class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEntity): # type: ignore[reportIncompatibleVariableOverride]
|
class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEntity): # type: ignore[reportIncompatibleVariableOverride]
|
||||||
"""Representation of a powerview shade."""
|
"""Representation of a PowerView shade with Up/Down functionality only."""
|
||||||
|
|
||||||
_attr_has_entity_name = True
|
_attr_has_entity_name = True
|
||||||
_attr_device_class = CoverDeviceClass.SHADE
|
_attr_device_class = CoverDeviceClass.SHADE
|
||||||
@@ -170,3 +172,50 @@ class PowerViewCover(PassiveBluetoothCoordinatorEntity[PVCoordinator], CoverEnti
|
|||||||
self.async_write_ha_state()
|
self.async_write_ha_state()
|
||||||
except BleakError as err:
|
except BleakError as err:
|
||||||
LOGGER.error("Failed to stop cover '%s': %s", self.name, err)
|
LOGGER.error("Failed to stop cover '%s': %s", self.name, err)
|
||||||
|
|
||||||
|
|
||||||
|
class PowerViewCoverTilt(PowerViewCover):
|
||||||
|
"""Representation of a PowerView shade with additional tilt functionality."""
|
||||||
|
|
||||||
|
_attr_supported_features = (
|
||||||
|
CoverEntityFeature.OPEN
|
||||||
|
| CoverEntityFeature.CLOSE
|
||||||
|
| CoverEntityFeature.SET_POSITION
|
||||||
|
| CoverEntityFeature.STOP
|
||||||
|
# | CoverEntityFeature.CLOSE_TILT
|
||||||
|
| CoverEntityFeature.SET_TILT_POSITION
|
||||||
|
# | CoverEntityFeature.OPEN_TILT
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_cover_tilt_position(self) -> int | None: # type: ignore[reportIncompatibleVariableOverride]
|
||||||
|
"""Return current tilt of cover.
|
||||||
|
|
||||||
|
None is unknown
|
||||||
|
"""
|
||||||
|
pos: Final = self._coord.data.get(ATTR_CURRENT_TILT_POSITION)
|
||||||
|
return round(pos) if pos is not None else None
|
||||||
|
|
||||||
|
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
|
||||||
|
"""Move the tilt to a specific position."""
|
||||||
|
|
||||||
|
if isinstance(target_position := kwargs.get(ATTR_TILT_POSITION), int):
|
||||||
|
LOGGER.debug("set cover tilt to position %i", target_position)
|
||||||
|
if (
|
||||||
|
self.current_cover_tilt_position == round(target_position)
|
||||||
|
or self.current_cover_position is None
|
||||||
|
):
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._coord.api.set_position(
|
||||||
|
self.current_cover_position, tilt=target_position
|
||||||
|
)
|
||||||
|
self.async_write_ha_state()
|
||||||
|
except BleakError as err:
|
||||||
|
LOGGER.error(
|
||||||
|
"Failed to tilt cover '%s' to %f%%: %s",
|
||||||
|
self.name,
|
||||||
|
target_position,
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user