Merge from old Gitea

This commit is contained in:
Jon
2026-04-12 16:06:44 +01:00
commit aaefb2bb35
18 changed files with 492 additions and 0 deletions

50
.gitignore vendored Normal file
View File

@@ -0,0 +1,50 @@
# These are some examples of commonly ignored file patterns.
# You should customize this list as applicable to your project.
# Learn more about .gitignore:
# https://www.atlassian.com/git/tutorials/saving-changes/gitignore
# Node artifact files
node_modules/
dist/
# Compiled Java class files
*.class
# Compiled Python bytecode
*.py[cod]
# Log files
*.log
# Package files
*.jar
# Maven
target/
dist/
# JetBrains IDE
.idea/
# Unit test reports
TEST*.xml
# Generated by MacOS
.DS_Store
# Generated by Windows
Thumbs.db
# Applications
*.app
*.exe
*.war
# Large media files
*.mp4
*.tiff
*.avi
*.flv
*.mov
*.wmv

37
README.md Normal file
View File

@@ -0,0 +1,37 @@
# Elite Skills Arena Equipment - Python SDK
[![PyPI Version](https://img.shields.io/pypi/v/your-sdk-package)](https://pypi.org/project/your-sdk-package/)
[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Python Version](https://img.shields.io/badge/python-3.6%2B-blue)](https://www.python.org/)
Official Python SDK for interacting with Elite Skills Arena products. This library provides a high-level interface to configure, control, and monitor your hardware devices programmatically.
## Table of Contents
- [Installation](#installation)
- [Getting Started](#getting-started)
- [Documentation](#documentation)
- [Examples](#examples)
- [API Reference](#api-reference)
- [Contributing](#contributing)
- [License](#license)
- [Support](#support)
## Installation
Install the SDK using pip:
```bash
pip install -e .
```
#### how ?
* connection manager - control list of machine connections and states
* machines - a machine object representing an ESA machine
* commands - websocket commands sent to the machine
### why?
The current state of websocket commands requires careful execution and forethought.
We can make this easier by abstracting into an SDK.
* user class - user / player
* machine class - A machine and its connection. State is stored on machine.
* game class - A game instance, to be uploaded and played.

View File

@@ -0,0 +1,7 @@
import json
import websockets
from typing import Dict, Any
import os

3
esasdk/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .machine import Machine
__all__ = ['Machine']

0
esasdk/commands.py Normal file
View File

9
esasdk/encoding.py Normal file
View File

@@ -0,0 +1,9 @@
We made a really bad encoding system....
Character Substitution
space _S
comma _C
_ _U
line feed ('\n', 0xA) _L
carriage return ('\r', 0xD) _R
vertical line | _V

0
esasdk/gamesList.json Normal file
View File

90
esasdk/machine.py Normal file
View File

@@ -0,0 +1,90 @@
import json
import websockets
from typing import Dict, Any
import os
class Machine:
def __init__(self, uri: str):
self.uri = uri
self.websocket = None
self.connected = False
async def connect(self) -> None:
""" connect to the websocket of the machine via uri passed to object init """
self.websocket = await websockets.connect(self.uri)
self.connected = True
if os.getenv("DEBUG"): print(f"Connected to {self.uri}")
async def _send_command(self, command: str, data: Dict = None) -> Dict:
"""Helper method to send commands and receive responses"""
if not self.connected: raise ConnectionError("Not connected to machine")
message = {"command": command}
if data: message.update(data)
#await self.websocket.send(json.dumps(message)) # TODO: JSON support.
#return json.loads(response)
if os.getenv("DEBUG")=='1': print(f'Tx: {message}')
await self.websocket.send(command)
response = await self.websocket.recv()
if os.getenv("DEBUG")=='1': print(f'Rx: {response}')
return response # TODO: make this JSON response
async def _await_message(self, message: str) -> Dict:
"""Helper method to wait for a specific received message """
if not self.connected: raise ConnectionError("Not connected to machine")
response = ""
if os.getenv("DEBUG"): print(f"AWAIT: awaiting message: {message}")
while True:
response = await self.websocket.recv()
if os.getenv("DEBUG"): print(f"Rx: {response}")
if message in response: break
return response
async def _send_gamescript(self, command: str, data: Dict = None) -> Dict:
""" Send a game script file to the machine, add code and restart service """
if not self.connected: raise ConnectionError("Not connected to machine")
# get game file pointer and upload game.
await self.websocket.send(message)
response = await self.websocket.recv()
return response
async def info(self) -> Dict[str, Any]: return await self._send_command("info")
async def name(self) -> Dict[str, Any]: return await self._send_command("NAM")
async def size(self) -> Dict[str, Any]: return await self._send_command("RNP")
async def software_version(self, panel: str="") -> Dict[str, Any]: return await self._send_command(f"VER {panel}" if panel else "VER")
async def log(self, log: str="") -> Dict[str, Any]: return await self._send_command(f"LOG {log}" if log else "LOG")
async def log_enable(self, log: str, state: bool) -> Dict[str, Any]: return await self._send_command(f"LOG {log}=on" if state else f"LOG {log}=off")
async def power(self) -> Dict[str, Any]: return await self._send_command("#P0-P VTG")
async def poweroff_timer_enable(self, state: bool = True) -> Dict[str, Any]: return await self._send_command("POW EnableAutoOff" if state else "POW KeepOn")
async def power_source(self) -> Dict[str, Any]: return await self._send_command("#P0-P STA")
async def poweroff(self) -> Dict[str, Any]: return await self._send_command("POW ShutDown")
async def volume(self, volume: int = None) -> Dict[str, Any]:
if volume is not None and not 1 <= volume <= 100: raise ValueError("Volume must be between 1 and 100")
return await self._send_command(f"VOL {volume}" if volume is not None else "VOL")
async def brightness(self, brightness: int = None) -> Dict[str, Any]:
if brightness is not None and not 1 <= brightness <= 100: raise ValueError("Brightness must be between 1 and 100")
return await self._send_command(f"BRI {brightness}" if brightness else "BRI")
async def hardware_button_enable(self, button: int, state: bool) -> Dict[str, Any]:
if button is not None and not 1 <= button <= 3: raise ValueError("Button index must be between 1 and 3")
return await self._send_command(f"BUT {button} 1" if state else f"BUT {button} 0")
async def game_start(self, gameCode, user=0, countdown=5, duration=60) -> Dict[str, Any]: return await self._send_command(f"GST {gameCode} u{user},c{countdown},t{duration}")
async def game_stop(self) -> Dict[str, Any]: return await self._send_command("GST")
async def game_stop_signal(self) -> Dict[str, Any]: return await self._await_message("STOPPED") # Check messages for STOPPED
async def game_restart(self) -> Dict[str, Any]: return await self._send_command("GST LAST")
async def game_update(self) -> Dict[str, Any]: return await self._send_command("GUP")
async def game_scoreboard_detail(self) -> Dict[str, Any]: return await self._send_command("SBD")
async def login_NFC(self, NFC_UID: str) -> Dict[str, Any]: return await self._send_command(f"NFC {NFC_UID}")
async def forceUserlistDownload(self) -> Dict[str, Any]: # force the list of users in the database to be refreshed.
await self._send_command("UPL forceUserListUpdate") # start the download.
return await self._await_message("MPU userlist write finished") # detect when download complete.
async def close(self) -> None:
if self.connected and self.websocket:
await self.websocket.close()
self.connected = False

27
examples/NFC.py Normal file
View File

@@ -0,0 +1,27 @@
import asyncio
import time
from esasdk import Machine
async def main():
# Connect to a machine
machine = Machine("ws://localhost:5424")
await machine.connect()
# send NFC UID
login = await machine.login_NFC("048CB772AC6D81")
loginInfo = login.split(" ")
userID = loginInfo[-1]
# start a game as jon
gameCode = "MAES"
countdown = 5
duration = 15
await machine.game_start(gameCode, userID, countdown, duration)
print(f"INFO: running {gameCode}")
# wait a stop event
await machine._await_message("STOPPED")
print("INFO: game stopped")
# Close connection
await machine.close()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,62 @@
import asyncio
from esasdk import Machine
######## Custom command broadcast to multiple machines ########
async def broadcast_command(machines, command):
return await asyncio.gather(
*[m._send_command(command) for m in machines]
)
######## Different commands to different machines ########
async def mixed_operations():
m1 = Machine("ws://worker1")
m2 = Machine("ws://supervisor")
await asyncio.gather(
m1.connect(),
m2.connect()
)
results = await asyncio.gather(
m1.poweroff(),
m2.info()
)
await asyncio.gather(
m1.close(),
m2.close()
)
################### MAIN EXAMPLE ################
async def manage_machine(machine: Machine):
"""Handle connection and commands for a single machine"""
try:
await machine.connect()
print(f"Connected to {machine.uri}")
# Get machine info
info = await machine.info()
print(f"{machine.uri} info: {info}")
# Power off the machine
result = await machine.poweroff()
print(f"{machine.uri} poweroff result: {result}")
except Exception as e:
print(f"Error with {machine.uri}: {str(e)}")
finally:
await machine.close()
async def main():
# Create multiple machine connections
machines = [
Machine("ws://machine1.example.com:8765"),
Machine("ws://machine2.example.com:8765"),
Machine("ws://machine3.example.com:8765")
]
# Run all machine operations concurrently
await asyncio.gather(*[manage_machine(m) for m in machines])
if __name__ == "__main__":
asyncio.run(main())

47
examples/example.py Normal file
View File

@@ -0,0 +1,47 @@
import asyncio
import time
from esasdk import Machine
async def main():
# Connect to a machine
machine = Machine("ws://localhost:5424")
await machine.connect()
# Get machine info
name = await machine.name()
print(f"Machine Name: {name}")
# Get machine info
info = await machine.info()
print(f"Machine Info: {info}")
# Get machine size
size = await machine.size()
print(f"Machine Size: {size}")
# get versions
sw = await machine.software_version()
print(f"SW: {sw}")
sw1 = await machine.software_version(1)
print(f"SW1: {sw1}")
# Get machine volume
volume = await machine.volume()
print(f"Machine Volume: {volume}")
# Set machine volume
volume = await machine.volume(50)
print(f"Machine Volume: {volume}")
# Get machine brightness
brightness = await machine.brightness()
print(f"Machine Brightness: {brightness}")
# Get machine power
power = await machine.power()
print(f"Machine Power (Volts): {power}")
# Close connection
await machine.close()
if __name__ == "__main__":
asyncio.run(main())

36
examples/game.py Normal file
View File

@@ -0,0 +1,36 @@
import asyncio
import time
from esasdk import Machine
async def main():
# Connect to a machine
machine = Machine("ws://localhost:5424")
await machine.connect()
# start a game
gameCode = "MAES"
await machine.game_start(gameCode)
print(f"running {gameCode}")
# wait a bit and stop the game
time.sleep(15)
await machine.game_stop()
print("game stopped forcefully")
# start a game as jon
gameCode = "MAES"
user = "16566"
countdown = 15
duration = 15
await machine.game_start(gameCode, user, countdown, duration)
print(f"running {gameCode}")
# wait for game stopped signal to know the game has stopped.
await machine.game_stop_signal()
print("game stopped naturally")
# Close connection
await machine.close()
if __name__ == "__main__":
asyncio.run(main())

33
examples/game_events.py Normal file
View File

@@ -0,0 +1,33 @@
#TODO: This needs IMP to be implemented as a websocket message not just a control panel message
import asyncio
import time
from esasdk import Machine
async def main():
# Connect to a machine
machine = Machine("ws://localhost:5424")
await machine.connect()
# start a game
gameCode = "MAES"
countdown = 5
duration = 15
await machine.game_start(gameCode, 0, countdown, duration)
print(f"running {gameCode}")
# wait for impact event - TODO: use 3491 socket or improve websocket?
await machine._await_message("IMP")
print("Impact!")
# program in game events like seconds remaining and hits/misses.
# wait for game stopped signal to know the game has stopped.
#await machine.game_stop()
await machine.game_stop_signal()
# Close connection
await machine.close()
if __name__ == "__main__":
asyncio.run(main())

26
examples/logs.py Normal file
View File

@@ -0,0 +1,26 @@
import asyncio
import time
from esasdk import Machine
async def main():
# Connect to a machine
machine = Machine("ws://localhost:5424")
await machine.connect()
# Get machine logs
logs = await machine.log()
print(f"Machine logs: {logs}")
# Get machine log state - not implemented in cpp yet.
#log = await machine.log('GameBase')
#print(f"Machine logs: {log}")
# Set machine log state - TODO: reply message is not implemented in cpp yet.
log = await machine.log_enabled('GameBase', True)
#print(f"Machine logs: {log}")
# Close connection
await machine.close()
if __name__ == "__main__":
asyncio.run(main())

26
examples/power.py Normal file
View File

@@ -0,0 +1,26 @@
import asyncio
import time
from esasdk import Machine
async def main():
# Connect to a machine
machine = Machine("ws://localhost:5424")
await machine.connect()
# Get machine power
power = await machine.power()
print(f"Machine Power (Volts): {power}")
# Get machine power source
powerSource = await machine.power_source()
print(f"Machine Power Source: {powerSource}")
# Set machine power timer - TODO: cpp response for this call.
powerTimer = await machine.poweroff_timer_enable(False)
print(f"Machine Power Timer: {powerTimer}")
# Close connection
await machine.close()
if __name__ == "__main__":
asyncio.run(main())

16
examples/test.py Normal file
View File

@@ -0,0 +1,16 @@
import asyncio
import time
from esasdk import Machine
async def main():
# Connect to a machine
machine = Machine("ws://localhost:5424")
await machine.connect()
await machine.forceUserlistDownload()
# Close connection
await machine.close()
if __name__ == "__main__":
asyncio.run(main())

1
license Normal file
View File

@@ -0,0 +1 @@
Closed Source

22
setup.py Normal file
View File

@@ -0,0 +1,22 @@
from setuptools import setup, find_packages
setup(
name='esasdk',
version='0.1.0',
packages=find_packages(),
install_requires=[
'websockets>=10.0',
],
author='Elite Skills Arena',
author_email='tech@eliteskillsarena.com',
description='ESA SDK for machine control via WebSockets',
keywords='websocket machine control',
long_description=open('README.md').read(),
long_description_content_type='text/markdown',
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
],
python_requires='>=3.7',
)