Commit a07dc035840d3e9fa970ae49428a40135325bc8d

Authored by Olli-Pekka Kahilakoski
1 parent 9460b9cd
Exists in master

ADD: Script for sending events to InVesalius via Socket.IO

Showing 1 changed file with 92 additions and 0 deletions   Show diff stats
scripts/invesalius_server.py 0 → 100644
... ... @@ -0,0 +1,92 @@
  1 +#!/usr/bin/env python3
  2 +# -*- coding: utf-8 -*-
  3 +
  4 +# This scripts allows sending events to InVesalius via Socket.IO, mimicking InVesalius's
  5 +# internal communication. It can be useful for developing and debugging InVesalius.
  6 +#
  7 +# Install the requirements by running:
  8 +#
  9 +# pip install aioconsole nest-asyncio python-socketio[client] requests uvicorn[standard]
  10 +#
  11 +# Example usage:
  12 +#
  13 +# - (In console window 1) Run the script by: python scripts/invesalius_server.py 5000
  14 +#
  15 +# - (In console window 2) Run InVesalius by: python app.py --remote-host http://localhost:5000
  16 +#
  17 +# - If InVesalius connected to the server successfully, a message should appear in console window 1,
  18 +# asking to provide the topic name.
  19 +#
  20 +# - Enter the topic name, such as "Add marker" (without quotes).
  21 +#
  22 +# - Enter the data, such as {"ball_id": 0, "size": 2, "colour": [1.0, 1.0, 0.0], "coord": [10.0, 20.0, 30.0]}
  23 +#
  24 +# - If successful, a message should now appear in console window 2, indicating that the event was received.
  25 +
  26 +import asyncio
  27 +import sys
  28 +import json
  29 +
  30 +import aioconsole
  31 +import nest_asyncio
  32 +import socketio
  33 +import uvicorn
  34 +
  35 +nest_asyncio.apply()
  36 +
  37 +if len(sys.argv) != 2:
  38 + print ("""This script allows sending events to InVesalius.
  39 +
  40 +Usage: python invesalius_server.py port""")
  41 + sys.exit(1)
  42 +
  43 +port = int(sys.argv[1])
  44 +
  45 +sio = socketio.AsyncServer(async_mode='asgi')
  46 +app = socketio.ASGIApp(sio)
  47 +
  48 +connected = False
  49 +
  50 +@sio.event
  51 +def connect(sid, environ):
  52 + global connected
  53 + connected = True
  54 +
  55 +def print_json_error(e):
  56 + print("Invalid JSON")
  57 + print(e.doc)
  58 + print(" " * e.pos + "^")
  59 + print(e.msg)
  60 + print("")
  61 +
  62 +async def run():
  63 + while True:
  64 + if not connected:
  65 + await asyncio.sleep(1)
  66 + continue
  67 +
  68 + print("Enter topic: ")
  69 + topic = await aioconsole.ainput()
  70 + print("Enter data as JSON: ")
  71 + data = await aioconsole.ainput()
  72 +
  73 + try:
  74 + decoded = json.loads(data)
  75 + except json.decoder.JSONDecodeError as e:
  76 + print_json_error(e)
  77 + continue
  78 +
  79 + await sio.emit(
  80 + event="to_neuronavigation",
  81 + data={
  82 + "topic": topic,
  83 + "data": decoded,
  84 + }
  85 + )
  86 +
  87 +async def main():
  88 + asyncio.create_task(run())
  89 + uvicorn.run(app, port=port, host='0.0.0.0', loop='asyncio')
  90 +
  91 +if __name__ == '__main__':
  92 + asyncio.run(main(), debug=True)
... ...