Files
esa-python-sdk/esasdk/machine.py
2026-04-12 16:06:44 +01:00

91 lines
5.2 KiB
Python

import json
import websockets
from typing import Dict, Any
import os
class Machine:
def __init__(self, uri: str):
self.uri = uri
self.websocket = None
self.connected = False
async def connect(self) -> None:
""" connect to the websocket of the machine via uri passed to object init """
self.websocket = await websockets.connect(self.uri)
self.connected = True
if os.getenv("DEBUG"): print(f"Connected to {self.uri}")
async def _send_command(self, command: str, data: Dict = None) -> Dict:
"""Helper method to send commands and receive responses"""
if not self.connected: raise ConnectionError("Not connected to machine")
message = {"command": command}
if data: message.update(data)
#await self.websocket.send(json.dumps(message)) # TODO: JSON support.
#return json.loads(response)
if os.getenv("DEBUG")=='1': print(f'Tx: {message}')
await self.websocket.send(command)
response = await self.websocket.recv()
if os.getenv("DEBUG")=='1': print(f'Rx: {response}')
return response # TODO: make this JSON response
async def _await_message(self, message: str) -> Dict:
"""Helper method to wait for a specific received message """
if not self.connected: raise ConnectionError("Not connected to machine")
response = ""
if os.getenv("DEBUG"): print(f"AWAIT: awaiting message: {message}")
while True:
response = await self.websocket.recv()
if os.getenv("DEBUG"): print(f"Rx: {response}")
if message in response: break
return response
async def _send_gamescript(self, command: str, data: Dict = None) -> Dict:
""" Send a game script file to the machine, add code and restart service """
if not self.connected: raise ConnectionError("Not connected to machine")
# get game file pointer and upload game.
await self.websocket.send(message)
response = await self.websocket.recv()
return response
async def info(self) -> Dict[str, Any]: return await self._send_command("info")
async def name(self) -> Dict[str, Any]: return await self._send_command("NAM")
async def size(self) -> Dict[str, Any]: return await self._send_command("RNP")
async def software_version(self, panel: str="") -> Dict[str, Any]: return await self._send_command(f"VER {panel}" if panel else "VER")
async def log(self, log: str="") -> Dict[str, Any]: return await self._send_command(f"LOG {log}" if log else "LOG")
async def log_enable(self, log: str, state: bool) -> Dict[str, Any]: return await self._send_command(f"LOG {log}=on" if state else f"LOG {log}=off")
async def power(self) -> Dict[str, Any]: return await self._send_command("#P0-P VTG")
async def poweroff_timer_enable(self, state: bool = True) -> Dict[str, Any]: return await self._send_command("POW EnableAutoOff" if state else "POW KeepOn")
async def power_source(self) -> Dict[str, Any]: return await self._send_command("#P0-P STA")
async def poweroff(self) -> Dict[str, Any]: return await self._send_command("POW ShutDown")
async def volume(self, volume: int = None) -> Dict[str, Any]:
if volume is not None and not 1 <= volume <= 100: raise ValueError("Volume must be between 1 and 100")
return await self._send_command(f"VOL {volume}" if volume is not None else "VOL")
async def brightness(self, brightness: int = None) -> Dict[str, Any]:
if brightness is not None and not 1 <= brightness <= 100: raise ValueError("Brightness must be between 1 and 100")
return await self._send_command(f"BRI {brightness}" if brightness else "BRI")
async def hardware_button_enable(self, button: int, state: bool) -> Dict[str, Any]:
if button is not None and not 1 <= button <= 3: raise ValueError("Button index must be between 1 and 3")
return await self._send_command(f"BUT {button} 1" if state else f"BUT {button} 0")
async def game_start(self, gameCode, user=0, countdown=5, duration=60) -> Dict[str, Any]: return await self._send_command(f"GST {gameCode} u{user},c{countdown},t{duration}")
async def game_stop(self) -> Dict[str, Any]: return await self._send_command("GST")
async def game_stop_signal(self) -> Dict[str, Any]: return await self._await_message("STOPPED") # Check messages for STOPPED
async def game_restart(self) -> Dict[str, Any]: return await self._send_command("GST LAST")
async def game_update(self) -> Dict[str, Any]: return await self._send_command("GUP")
async def game_scoreboard_detail(self) -> Dict[str, Any]: return await self._send_command("SBD")
async def login_NFC(self, NFC_UID: str) -> Dict[str, Any]: return await self._send_command(f"NFC {NFC_UID}")
async def forceUserlistDownload(self) -> Dict[str, Any]: # force the list of users in the database to be refreshed.
await self._send_command("UPL forceUserListUpdate") # start the download.
return await self._await_message("MPU userlist write finished") # detect when download complete.
async def close(self) -> None:
if self.connected and self.websocket:
await self.websocket.close()
self.connected = False