Commit 16d3c1ec authored by Léo-Paul Géneau's avatar Léo-Paul Géneau 👾

software/js-drone: use WebSocket for subscriber

parent 7d625be4
...@@ -14,15 +14,15 @@ ...@@ -14,15 +14,15 @@
# not need these here). # not need these here).
[instance-profile] [instance-profile]
filename = instance.cfg filename = instance.cfg
md5sum = 360b58007c25727b7bd8a9154d5cafd4 md5sum = c15e4e308d5669cd87258c841714244f
[instance-default] [instance-default]
filename = instance-default.cfg filename = instance-default.cfg
md5sum = 1daf69cb5d9fbb3753a117d5b87ef8a9 md5sum = 9a41a0902663fb87d52bc35a03dbcc49
[instance-drone] [instance-drone]
filename = instance-drone.cfg filename = instance-drone.cfg
md5sum = 1ff50063f5a54712a0bc0ff38fa74630 md5sum = b1910416ffa6f2b6a84fd13cca6109ce
[main] [main]
filename = main.js filename = main.js
...@@ -34,4 +34,4 @@ md5sum = 1555496ad591a31a845f33488d5c335d ...@@ -34,4 +34,4 @@ md5sum = 1555496ad591a31a845f33488d5c335d
[worker] [worker]
filename = worker.js filename = worker.js
md5sum = f201266b4607ca08d647886afd1e5c96 md5sum = 33f09ad15e70230054f00778e6f53f38
...@@ -37,7 +37,7 @@ config-flightScript = {{ flight_script }} ...@@ -37,7 +37,7 @@ config-flightScript = {{ flight_script }}
{% else -%} {% else -%}
{% do subscriber_id_list.append(id) %} {% do subscriber_id_list.append(id) %}
config-isADrone = {{ dumps(False) }} config-isADrone = {{ dumps(False) }}
config-flightScript = https://lab.nexedi.com/nexedi/flight-scripts/raw/master/subscribe.js config-flightScript = https://lab.nexedi.com/nexedi/flight-scripts/raw/api_update/subscribe.js
{% endif -%} {% endif -%}
config-multicastIp = {{ multicast_ip }} config-multicastIp = {{ multicast_ip }}
config-netIf = {{ net_if }} config-netIf = {{ net_if }}
......
...@@ -19,6 +19,7 @@ template = ${buildout:directory}/$${:_buildout_section_name_}.js ...@@ -19,6 +19,7 @@ template = ${buildout:directory}/$${:_buildout_section_name_}.js
extra-context = extra-context =
context = context =
import json_module json import json_module json
raw gwsocket_bin ${gwsocket:location}/bin/gwsocket
raw qjs_wrapper ${qjs-wrapper:location}/lib/libqjswrapper.so raw qjs_wrapper ${qjs-wrapper:location}/lib/libqjswrapper.so
raw configuration {{ configuration }} raw configuration {{ configuration }}
$${:extra-context} $${:extra-context}
...@@ -48,3 +49,6 @@ init = ...@@ -48,3 +49,6 @@ init =
[publish-connection-information] [publish-connection-information]
recipe = slapos.cookbook:publish.serialised recipe = slapos.cookbook:publish.serialised
instance-path = $${directory:home} instance-path = $${directory:home}
{% if not is_a_drone -%}
websocket-url = ws://[{{ websocket_ip }}]:{{ websocket_port }}
{% endif -%}
...@@ -31,27 +31,42 @@ extensions = jinja2.ext.do ...@@ -31,27 +31,42 @@ extensions = jinja2.ext.do
context = context =
key slapparameter_dict slap-configuration:configuration key slapparameter_dict slap-configuration:configuration
[instance-drone] [directory]
<= dynamic-template-base recipe = slapos.cookbook:mkdirectory
context = home = $${buildout:directory}
key configuration drone-configuration:output etc = $${:home}/etc
key user-script user:destination
[user]
recipe = slapos.recipe.build:download
url = $${slap-configuration:configuration.flightScript}
destination = $${directory:etc}/user.js
offline = false
[gwsocket-port]
recipe = slapos.cookbook:free_port
minimum = 6789
maximum = 6799
ip = $${slap-configuration:ipv6-random}
[drone-configuration] [drone-configuration]
recipe = slapos.recipe.template:jinja2 recipe = slapos.recipe.template:jinja2
output = $${directory:etc}/configuration.json output = $${directory:etc}/configuration.json
extensions = jinja2.ext.do
context = context =
import json_module json import json_module json
key websocket_ip gwsocket-port:ip
key websocket_port gwsocket-port:port
key slapparameter_dict slap-configuration:configuration key slapparameter_dict slap-configuration:configuration
inline = {{ json_module.dumps(slapparameter_dict) }} inline =
{% do slapparameter_dict.__setitem__('websocketIp', websocket_ip) -%}
[user] {% do slapparameter_dict.__setitem__('websocketPort', websocket_port) -%}
recipe = slapos.recipe.build:download {{ json_module.dumps(slapparameter_dict) }}
url = $${slap-configuration:configuration.flightScript}
destination = $${directory:etc}/user.js
offline = false
[directory] [instance-drone]
recipe = slapos.cookbook:mkdirectory <= dynamic-template-base
home = $${buildout:directory} depends = $${user:recipe}
etc = $${:home}/etc context =
key configuration drone-configuration:output
key is_a_drone slap-configuration:configuration.isADrone
key websocket_ip gwsocket-port:ip
key websocket_port gwsocket-port:port
...@@ -3,6 +3,7 @@ extends = ...@@ -3,6 +3,7 @@ extends =
buildout.hash.cfg buildout.hash.cfg
../../stack/slapos.cfg ../../stack/slapos.cfg
../../component/qjs-wrapper/buildout.cfg ../../component/qjs-wrapper/buildout.cfg
../../component/gwsocket/buildout.cfg
parts = parts =
instance-profile instance-profile
......
...@@ -43,7 +43,8 @@ setup(name=name, ...@@ -43,7 +43,8 @@ setup(name=name,
install_requires=[ install_requires=[
'slapos.core', 'slapos.core',
'slapos.libnetworkcache', 'slapos.libnetworkcache',
'erp5.util' 'erp5.util',
'websocket-client',
], ],
zip_safe=True, zip_safe=True,
test_suite='test', test_suite='test',
......
...@@ -32,6 +32,7 @@ import socket ...@@ -32,6 +32,7 @@ import socket
import struct import struct
import subprocess import subprocess
import time import time
import websocket
from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass
...@@ -105,7 +106,8 @@ SPEED_ARRAY_TYPE = 10 #float ...@@ -105,7 +106,8 @@ SPEED_ARRAY_TYPE = 10 #float
SPEED_ARRAY_VALUES = (-72.419998, 15.93, -0.015) SPEED_ARRAY_VALUES = (-72.419998, 15.93, -0.015)
STRING_TYPE = 12 STRING_TYPE = 12
TEST_MESSAGE = b'{"content":"{\\"next_checkpoint\\":1}","dest_id":-1}' MESSAGE_CONTENT = b'{\\"next_checkpoint\\":1}'
TEST_MESSAGE = b'{"content":"' + MESSAGE_CONTENT + b'","dest_id":-1}'
setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass( setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass(
os.path.abspath( os.path.abspath(
...@@ -143,13 +145,22 @@ class JSDroneTestCase(SlapOSInstanceTestCase): ...@@ -143,13 +145,22 @@ class JSDroneTestCase(SlapOSInstanceTestCase):
quickjs_bin, quickjs_bin,
os.path.join(script_dir, MAIN_SCRIPT_NAME), os.path.join(script_dir, MAIN_SCRIPT_NAME),
os.path.join(script_dir, USER_SCRIPT_NAME), os.path.join(script_dir, USER_SCRIPT_NAME),
], ]
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
) )
time.sleep(0.1) self.websocket_server_address = json.loads(
subscriber_partition.getConnectionParameterDict()['_'])['websocket-url']
time.sleep(0.5)
def tearDown(self): def tearDown(self):
ws = websocket.WebSocket()
ws.connect(self.websocket_server_address, timeout=5)
try:
ws.send("quit")
except websocket.WebSocketTimeoutException:
pass
finally:
ws.close()
time.sleep(0.1)
if self.qjs_process.returncode == None: if self.qjs_process.returncode == None:
self.qjs_process.kill() self.qjs_process.kill()
self.qjs_process.communicate() self.qjs_process.communicate()
...@@ -260,7 +271,7 @@ class JSDroneTestCase(SlapOSInstanceTestCase): ...@@ -260,7 +271,7 @@ class JSDroneTestCase(SlapOSInstanceTestCase):
'id': 1, 'id': 1,
'isASimulation': False, 'isASimulation': False,
'isADrone': False, 'isADrone': False,
'flightScript': 'https://lab.nexedi.com/nexedi/flight-scripts/raw/master/subscribe.js', 'flightScript': 'https://lab.nexedi.com/nexedi/flight-scripts/raw/api_update/subscribe.js',
'netIf': OPC_UA_NET_IF, 'netIf': OPC_UA_NET_IF,
'multicastIp': MCAST_GRP 'multicastIp': MCAST_GRP
} }
...@@ -281,14 +292,24 @@ class JSDroneTestCase(SlapOSInstanceTestCase): ...@@ -281,14 +292,24 @@ class JSDroneTestCase(SlapOSInstanceTestCase):
self.assertIn(expected_string, f.readlines()) self.assertIn(expected_string, f.readlines())
def test_pubsub_subscription(self): def test_pubsub_subscription(self):
ws = websocket.WebSocket()
ws.connect(self.websocket_server_address, timeout=5)
self.assertEqual(
ws.recv_frame().data,
b''.join((
b'{"drone_dict":{"0":{"latitude":',
b'"%.6f","longitude":"%.6f","altitude":"%.2f",' % (0, 0, 0),
b'"yaw":"%.2f","speed":"%.2f","climbRate":"%.2f"}}}' % (0, 0, 0),
))
)
self.send_ua_networkMessage() self.send_ua_networkMessage()
time.sleep(0.1) time.sleep(0.1)
outs, _ = self.qjs_process.communicate(b'q\n', timeout=15) self.assertEqual(ws.recv_frame().data, MESSAGE_CONTENT.replace(b'\\', b''))
decoded_out = outs.decode() self.assertEqual(
for line in ( ws.recv_frame().data,
'Subscription 0 | MonitoredItem %s' % MONITORED_ITEM_NB, b''.join((
'Received position of drone 0: %f° %f° %fm %fm' % POSITION_ARRAY_VALUES, b'{"drone_dict":{"0":{"latitude":',
'Received speed of drone 0: %f° %fm/s %fm/s' % SPEED_ARRAY_VALUES, b'"%.6f","longitude":"%.6f","altitude":"%.2f",' % POSITION_ARRAY_VALUES[:-1],
'Received message for drone 0: %s' % TEST_MESSAGE.decode(), b'"yaw":"%.2f","speed":"%.2f","climbRate":"%.2f"}}}' % SPEED_ARRAY_VALUES,
): ))
self.assertIn(line, decoded_out) )
...@@ -19,32 +19,48 @@ import { ...@@ -19,32 +19,48 @@ import {
setMessage, setMessage,
setTargetCoordinates setTargetCoordinates
} from {{ json_module.dumps(qjs_wrapper) }}; } from {{ json_module.dumps(qjs_wrapper) }};
import * as std from "std"; import {
import { Worker } from "os"; SIGTERM,
WNOHANG,
Worker,
close,
exec,
kill,
pipe,
setReadHandler,
waitpid
} from "os";
import { evalScript, exit, fdopen, loadFile, open } from "std";
(function (console, getAltitude, getAltitudeRel, getInitialAltitude, (function (Drone, SIGTERM, WNOHANG, Worker, close, console, evalScript, exec,
getLatitude, getLongitude, getYaw, initPubsub, loiter, exit, fdopen, getAltitude, getAltitudeRel, getInitialAltitude,
setAirspeed, setMessage, setTargetCoordinates, std, triggerParachute, getLatitude, getLongitude, getYaw, initPubsub, kill, loadFile,
Drone, Worker) { loiter, open, pipe, setAirspeed, setMessage, setReadHandler,
setTargetCoordinates, triggerParachute, waitpid) {
// Every script is evaluated per drone // Every script is evaluated per drone
"use strict"; "use strict";
var CONF_PATH = {{ json_module.dumps(configuration) }}, var CONF_PATH = {{ json_module.dumps(configuration) }},
conf_file = std.open(CONF_PATH, "r"), conf_file = open(CONF_PATH, "r"),
configuration = JSON.parse(conf_file.readAsString()), configuration = JSON.parse(conf_file.readAsString()),
clientId,
gwsocket_pid,
gwsocket_r_pipe_fd,
gwsocket_w_pipe_fd,
handleWebSocketMessage,
last_message_timestamp = 0, last_message_timestamp = 0,
parent = Worker.parent, parent = Worker.parent,
peer_dict = {}, peer_dict = {},
user_me = { user_me = {
//for debugging purpose
fdopen: std.fdopen,
in: std.in,
//required to fly //required to fly
triggerParachute: triggerParachute, triggerParachute: triggerParachute,
drone_dict: {}, drone_dict: {},
exit: function (exit_code) { exit: function (exit_code) {
parent.postMessage({type: "exited", exit: exit_code}); parent.postMessage({type: "exited", exit: exit_code});
parent.onmessage = null; parent.onmessage = null;
if (user_me.hasOwnProperty("onWebSocketMessage")) {
stopGwsocket();
}
}, },
getAltitudeAbs: getAltitude, getAltitudeAbs: getAltitude,
getCurrentPosition: function () { getCurrentPosition: function () {
...@@ -73,22 +89,108 @@ import { Worker } from "os"; ...@@ -73,22 +89,108 @@ import { Worker } from "os";
}; };
conf_file.close(); conf_file.close();
function readMessage(rd) {
function read4() {
var b1, b2, b3, b4;
b1 = rd.getByte();
b2 = rd.getByte();
b3 = rd.getByte();
b4 = rd.getByte();
return (b1 << 24) | (b2 << 16) | (b3 << 8) | b4;
}
clientId = read4();
var type = read4();
var len = read4();
var data = new ArrayBuffer(len);
rd.read(data, 0, len);
return {
client: clientId,
type: type,
data: String.fromCharCode.apply(null, new Uint8Array(data)).trim()
};
}
function writeMessage(wr, m) {
function write4(v) {
wr.putByte((v >> 24) & 0xFF);
wr.putByte((v >> 16) & 0xFF);
wr.putByte((v >> 8) & 0xFF);
wr.putByte(v & 0xFF);
}
write4(m.client);
write4(m.type);
write4(m.data.byteLength);
wr.write(m.data, 0, m.data.byteLength);
wr.flush();
}
function runGwsocket(onMessage) {
var gwsocket_w_pipe = pipe(),
gwsocket_r_pipe = pipe();
gwsocket_pid = exec([
"gwsocket",
"--port=" + configuration.websocketPort,
"--addr=" + configuration.websocketIp,
"--std",
"--strict"
], {
block: false,
usePath: false,
file: {{ json_module.dumps(gwsocket_bin) }},
stdin: gwsocket_w_pipe[0],
stdout: gwsocket_r_pipe[1]
});
gwsocket_w_pipe_fd = fdopen(gwsocket_w_pipe[1], "w");
gwsocket_r_pipe_fd = fdopen(gwsocket_r_pipe[0], "r");
handleWebSocketMessage = function () {
var message = readMessage(gwsocket_r_pipe_fd).data;
if (message.includes(configuration.websocketIp)) {
return;
}
onMessage(message);
};
user_me.writeWebsocketMessage = function (message) {
var buf = new ArrayBuffer(message.length);
var bufView = new Uint8Array(buf);
for (var i=0; i<message.length; i++) {
bufView[i] = message.charCodeAt(i);
}
writeMessage(gwsocket_w_pipe_fd, {client: clientId, type: 1, data: buf});
}
setReadHandler(gwsocket_r_pipe[0], handleWebSocketMessage);
}
function stopGwsocket() {
handleWebSocketMessage = null;
close(gwsocket_w_pipe_fd);
close(gwsocket_r_pipe_fd);
kill(gwsocket_pid, SIGTERM);
waitpid(gwsocket_pid, WNOHANG);
}
function loadUserScript(path) { function loadUserScript(path) {
var script_content = std.loadFile(path); var script_content = loadFile(path);
if (script_content === null) { if (script_content === null) {
console.log("Failed to load user script " + path); console.log("Failed to load user script " + path);
std.exit(1); exit(1);
} }
try { try {
std.evalScript( evalScript(
"function execUserScript(from, me) {" + script_content + "};" "function execUserScript(from, me) {" + script_content + "};"
); );
} catch (e) { } catch (e) {
console.log("Failed to evaluate user script", e); console.log("Failed to evaluate user script", e);
std.exit(1); exit(1);
} }
execUserScript(null, user_me); execUserScript(null, user_me);
if (user_me.hasOwnProperty("onWebSocketMessage")) {
runGwsocket(user_me.onWebSocketMessage);
}
// Call the drone onStart function // Call the drone onStart function
if (user_me.hasOwnProperty("onStart")) { if (user_me.hasOwnProperty("onStart")) {
user_me.onStart(); user_me.onStart();
...@@ -141,9 +243,11 @@ import { Worker } from "os"; ...@@ -141,9 +243,11 @@ import { Worker } from "os";
// Catch all potential bug to exit the main process // Catch all potential bug to exit the main process
// if it occurs // if it occurs
console.log(error); console.log(error);
std.exit(1); exit(1);
} }
}; };
}(console, getAltitude, getAltitudeRel, getInitialAltitude, getLatitude, }(Drone, SIGTERM, WNOHANG, Worker, close, console, evalScript, exec,
getLongitude, getYaw, initPubsub, loiter, setAirspeed, setMessage, exit, fdopen, getAltitude, getAltitudeRel, getInitialAltitude,
setTargetCoordinates, std, triggerParachute, Drone, Worker)); getLatitude, getLongitude, getYaw, initPubsub, kill, loadFile,
loiter, open, pipe, setAirspeed, setMessage, setReadHandler,
setTargetCoordinates, triggerParachute, waitpid));
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment