91 lines
5.2 KiB
Python
91 lines
5.2 KiB
Python
import json
|
|
import websockets
|
|
from typing import Dict, Any
|
|
import os
|
|
|
|
class Machine:
|
|
def __init__(self, uri: str):
|
|
self.uri = uri
|
|
self.websocket = None
|
|
self.connected = False
|
|
|
|
async def connect(self) -> None:
|
|
""" connect to the websocket of the machine via uri passed to object init """
|
|
self.websocket = await websockets.connect(self.uri)
|
|
self.connected = True
|
|
if os.getenv("DEBUG"): print(f"Connected to {self.uri}")
|
|
|
|
async def _send_command(self, command: str, data: Dict = None) -> Dict:
|
|
"""Helper method to send commands and receive responses"""
|
|
if not self.connected: raise ConnectionError("Not connected to machine")
|
|
|
|
message = {"command": command}
|
|
if data: message.update(data)
|
|
|
|
#await self.websocket.send(json.dumps(message)) # TODO: JSON support.
|
|
#return json.loads(response)
|
|
if os.getenv("DEBUG")=='1': print(f'Tx: {message}')
|
|
await self.websocket.send(command)
|
|
response = await self.websocket.recv()
|
|
if os.getenv("DEBUG")=='1': print(f'Rx: {response}')
|
|
|
|
return response # TODO: make this JSON response
|
|
|
|
async def _await_message(self, message: str) -> Dict:
|
|
"""Helper method to wait for a specific received message """
|
|
if not self.connected: raise ConnectionError("Not connected to machine")
|
|
response = ""
|
|
if os.getenv("DEBUG"): print(f"AWAIT: awaiting message: {message}")
|
|
while True:
|
|
response = await self.websocket.recv()
|
|
if os.getenv("DEBUG"): print(f"Rx: {response}")
|
|
if message in response: break
|
|
return response
|
|
|
|
async def _send_gamescript(self, command: str, data: Dict = None) -> Dict:
|
|
""" Send a game script file to the machine, add code and restart service """
|
|
if not self.connected: raise ConnectionError("Not connected to machine")
|
|
# get game file pointer and upload game.
|
|
|
|
await self.websocket.send(message)
|
|
response = await self.websocket.recv()
|
|
return response
|
|
|
|
async def info(self) -> Dict[str, Any]: return await self._send_command("info")
|
|
async def name(self) -> Dict[str, Any]: return await self._send_command("NAM")
|
|
async def size(self) -> Dict[str, Any]: return await self._send_command("RNP")
|
|
async def software_version(self, panel: str="") -> Dict[str, Any]: return await self._send_command(f"VER {panel}" if panel else "VER")
|
|
async def log(self, log: str="") -> Dict[str, Any]: return await self._send_command(f"LOG {log}" if log else "LOG")
|
|
async def log_enable(self, log: str, state: bool) -> Dict[str, Any]: return await self._send_command(f"LOG {log}=on" if state else f"LOG {log}=off")
|
|
async def power(self) -> Dict[str, Any]: return await self._send_command("#P0-P VTG")
|
|
async def poweroff_timer_enable(self, state: bool = True) -> Dict[str, Any]: return await self._send_command("POW EnableAutoOff" if state else "POW KeepOn")
|
|
async def power_source(self) -> Dict[str, Any]: return await self._send_command("#P0-P STA")
|
|
async def poweroff(self) -> Dict[str, Any]: return await self._send_command("POW ShutDown")
|
|
async def volume(self, volume: int = None) -> Dict[str, Any]:
|
|
if volume is not None and not 1 <= volume <= 100: raise ValueError("Volume must be between 1 and 100")
|
|
return await self._send_command(f"VOL {volume}" if volume is not None else "VOL")
|
|
async def brightness(self, brightness: int = None) -> Dict[str, Any]:
|
|
if brightness is not None and not 1 <= brightness <= 100: raise ValueError("Brightness must be between 1 and 100")
|
|
return await self._send_command(f"BRI {brightness}" if brightness else "BRI")
|
|
async def hardware_button_enable(self, button: int, state: bool) -> Dict[str, Any]:
|
|
if button is not None and not 1 <= button <= 3: raise ValueError("Button index must be between 1 and 3")
|
|
return await self._send_command(f"BUT {button} 1" if state else f"BUT {button} 0")
|
|
|
|
async def game_start(self, gameCode, user=0, countdown=5, duration=60) -> Dict[str, Any]: return await self._send_command(f"GST {gameCode} u{user},c{countdown},t{duration}")
|
|
async def game_stop(self) -> Dict[str, Any]: return await self._send_command("GST")
|
|
async def game_stop_signal(self) -> Dict[str, Any]: return await self._await_message("STOPPED") # Check messages for STOPPED
|
|
async def game_restart(self) -> Dict[str, Any]: return await self._send_command("GST LAST")
|
|
async def game_update(self) -> Dict[str, Any]: return await self._send_command("GUP")
|
|
async def game_scoreboard_detail(self) -> Dict[str, Any]: return await self._send_command("SBD")
|
|
|
|
async def login_NFC(self, NFC_UID: str) -> Dict[str, Any]: return await self._send_command(f"NFC {NFC_UID}")
|
|
|
|
async def forceUserlistDownload(self) -> Dict[str, Any]: # force the list of users in the database to be refreshed.
|
|
await self._send_command("UPL forceUserListUpdate") # start the download.
|
|
return await self._await_message("MPU userlist write finished") # detect when download complete.
|
|
|
|
async def close(self) -> None:
|
|
if self.connected and self.websocket:
|
|
await self.websocket.close()
|
|
self.connected = False
|