Update extract_gateway3_homekey.py
This commit is contained in:
@@ -1,71 +1,99 @@
|
|||||||
import requests
|
"""Extract PowerView homekey from a G3 PowerView Gateway"""
|
||||||
import json
|
|
||||||
import base64
|
import base64
|
||||||
|
import json
|
||||||
import struct
|
import struct
|
||||||
|
from typing import Any, Final
|
||||||
|
|
||||||
HUB = "http://192.168.0.184"
|
import requests
|
||||||
|
|
||||||
def create_request(sid, cid, sequenceId, data):
|
HUB: Final[str] = "http://powerview-g3.local"
|
||||||
data = struct.pack("<BBBB", sid, cid, sequenceId, len(data)) + data
|
TIMEOUT: Final[int] = 10
|
||||||
return data
|
|
||||||
|
|
||||||
def decode_response(packet):
|
|
||||||
|
def create_request(sid: int, cid: int, sequence_id: int, data: bytes) -> bytes:
|
||||||
|
"""Assemble a request frame for the PowerView protocol."""
|
||||||
|
return struct.pack("<BBBB", sid, cid, sequence_id, len(data)) + data
|
||||||
|
|
||||||
|
|
||||||
|
def decode_response(packet: bytes) -> dict[str, Any]:
|
||||||
if len(packet) < 4:
|
if len(packet) < 4:
|
||||||
raise Exception('Packet size too small')
|
raise ValueError("Packet size too small")
|
||||||
sid, cid, sequenceId, length = struct.unpack("<BBBB", packet[0:4])
|
sid, cid, sequence_id, length = struct.unpack("<BBBB", packet[0:4])
|
||||||
if len(packet) != 4 + length:
|
if len(packet) != 4 + length:
|
||||||
raise Exception('Not all data present')
|
raise ValueError("Not all data present")
|
||||||
if length < 1:
|
if length < 1:
|
||||||
raise Exception('No errorCode present')
|
raise ValueError("No errorCode present")
|
||||||
errorCode, = struct.unpack("<B", packet[4:5])
|
(error_code,) = struct.unpack("<B", packet[4:5])
|
||||||
data = packet[5:]
|
data: Final[bytes] = packet[5:]
|
||||||
return {
|
return {
|
||||||
'cid': cid,
|
"cid": cid,
|
||||||
'sid': sid,
|
"sid": sid,
|
||||||
'sequenceId': sequenceId,
|
"sequenceId": sequence_id,
|
||||||
'errorCode': errorCode,
|
"errorCode": error_code,
|
||||||
'data': data
|
"data": data,
|
||||||
}
|
}
|
||||||
|
|
||||||
def create_get_shade_key_request(sequenceId):
|
|
||||||
return create_request(251, 18, sequenceId, b"")
|
|
||||||
|
|
||||||
def get_shade_key(hub, bleName):
|
def create_get_shade_key_request(sequence_id) -> bytes:
|
||||||
shades_exec_resp = requests.post(hub + "/home/shades/exec?shades=" + bleName, json={"hex":create_get_shade_key_request(1).hex()})
|
"""Create a GetShadeKey request frame."""
|
||||||
if shades_exec_resp.status_code != 200:
|
return create_request(251, 18, sequence_id, b"")
|
||||||
raise Exception('Unable to send GetShadeKey')
|
|
||||||
result = json.loads(shades_exec_resp.content)
|
|
||||||
if not 'err' in result or result['err'] != 0 or not 'responses' in result or len(result['responses']) != 1:
|
|
||||||
raise Exception('Error when attempting GetShadeKey')
|
|
||||||
result = result['responses'][0]
|
|
||||||
result = bytes.fromhex(result['hex'])
|
|
||||||
result = decode_response(result)
|
|
||||||
if result['errorCode'] != 0:
|
|
||||||
raise Exception('BLE errorCode is not 0')
|
|
||||||
if len(result['data']) != 16:
|
|
||||||
raise Exception('Expected 16 byte homekey')
|
|
||||||
return result['data']
|
|
||||||
|
|
||||||
|
|
||||||
def main(hub):
|
def get_shade_key(hub: str, ble_name) -> bytes:
|
||||||
shades_resp = requests.get(hub + "/home/shades")
|
"""Get the homekey for a shade."""
|
||||||
if shades_resp.status_code != 200:
|
try:
|
||||||
raise Exception('Unable to get list of shades')
|
shades_exec_resp: requests.Response = requests.post(
|
||||||
|
hub + "/home/shades/exec?shades=" + ble_name,
|
||||||
|
json={"hex": create_get_shade_key_request(1).hex()},
|
||||||
|
timeout=TIMEOUT,
|
||||||
|
)
|
||||||
|
shades_exec_resp.raise_for_status()
|
||||||
|
except requests.exceptions.RequestException as ex:
|
||||||
|
print(f"Unable to send GetShadeKey {ex!s}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
result: dict = json.loads(shades_exec_resp.content)
|
||||||
|
if result.get("err") != 0 or len(result.get("responses", [])) != 1:
|
||||||
|
raise IOError("Error when attempting GetShadeKey")
|
||||||
|
response: Final[bytes] = bytes.fromhex(result["responses"][0]["hex"])
|
||||||
|
dec_resp: Final[dict[str, Any]] = decode_response(response)
|
||||||
|
if dec_resp["errorCode"] != 0:
|
||||||
|
raise ValueError("BLE errorCode is not 0")
|
||||||
|
if len(dec_resp["data"]) != 16:
|
||||||
|
raise ValueError("Expected 16 byte homekey")
|
||||||
|
return dec_resp["data"]
|
||||||
|
|
||||||
|
|
||||||
|
def main(hub: str) -> None:
|
||||||
|
"""Main function, extract homekeys from all shades."""
|
||||||
|
try:
|
||||||
|
shades_resp: requests.Response = requests.get(
|
||||||
|
hub + "/home/shades", timeout=TIMEOUT
|
||||||
|
)
|
||||||
|
shades_resp.raise_for_status()
|
||||||
|
except requests.exceptions.RequestException as ex:
|
||||||
|
print(f"Unable to get list of shades:\n\t{ex!s}")
|
||||||
|
return
|
||||||
|
|
||||||
shades = json.loads(shades_resp.content)
|
shades = json.loads(shades_resp.content)
|
||||||
print(f"Found {len(shades)} shades, interrogating")
|
print(f"Found {len(shades)} shades, interrogating")
|
||||||
for shade in shades:
|
for shade in shades:
|
||||||
name = base64.b64decode(shade['name']).decode('utf-8')
|
name: str = base64.b64decode(shade["name"]).decode("utf-8")
|
||||||
|
key: bytes = get_shade_key(hub, shade["bleName"])
|
||||||
|
|
||||||
print(f"Shade '{name}':")
|
print(f"Shade '{name}':")
|
||||||
print(f"\tBLE name: '{shade['bleName']}'")
|
print(f"\tBLE name: '{shade['bleName']}'")
|
||||||
|
|
||||||
key = get_shade_key(hub, shade['bleName'])
|
|
||||||
print(f"\tHomeKey: {key.hex()}")
|
print(f"\tHomeKey: {key.hex()}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
import argparse
|
import argparse
|
||||||
import sys
|
import sys
|
||||||
parser = argparse.ArgumentParser(description="Extract PowerView homekey from a G3 PowerView Gateway")
|
|
||||||
parser.add_argument("hub", help="URL to HUB", default="http://powerview-g3.local")
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Extract PowerView homekey from a G3 PowerView Gateway"
|
||||||
|
)
|
||||||
|
parser.add_argument("hub", nargs="?", help="URL to HUB", default=HUB)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
sys.exit(main(**vars(args)))
|
sys.exit(main(**vars(args)))
|
||||||
|
|||||||
Reference in New Issue
Block a user