Commit c65c6394 authored by Léo-Paul Géneau's avatar Léo-Paul Géneau 👾

component/qjs-wrapper: store PubSub messages in pipes

parent da40f800
...@@ -9,7 +9,7 @@ parts = qjs-wrapper ...@@ -9,7 +9,7 @@ parts = qjs-wrapper
[qjs-wrapper-source] [qjs-wrapper-source]
recipe = slapos.recipe.build:gitclone recipe = slapos.recipe.build:gitclone
repository = https://lab.nexedi.com/nexedi/qjs-wrapper.git repository = https://lab.nexedi.com/nexedi/qjs-wrapper.git
revision = e90dc40 revision = 38c9977
git-executable = ${git:location}/bin/git git-executable = ${git:location}/bin/git
[qjs-wrapper] [qjs-wrapper]
......
...@@ -46,4 +46,4 @@ md5sum = 3bbb0f80b644d86784aab99b03e88c2f ...@@ -46,4 +46,4 @@ md5sum = 3bbb0f80b644d86784aab99b03e88c2f
[worker] [worker]
_update_hash_filename_ = drone-scripts/worker.js.jinja2 _update_hash_filename_ = drone-scripts/worker.js.jinja2
md5sum = 6cde256965271f89c6e82f04e3c0ce5c md5sum = be8b4f7daeb325549a48f4e98461ee19
...@@ -71,7 +71,6 @@ import { evalScript, fdopen, loadFile, open } from "std"; ...@@ -71,7 +71,6 @@ import { evalScript, fdopen, loadFile, open } from "std";
gwsocket_w_pipe_fd, gwsocket_w_pipe_fd,
handleWebSocketMessage, handleWebSocketMessage,
last_log_timestamp = 0, last_log_timestamp = 0,
last_message_timestamp_list = [],
parent = Worker.parent, parent = Worker.parent,
peer_dict = {}, peer_dict = {},
to_send_list = [], to_send_list = [],
...@@ -259,15 +258,17 @@ import { evalScript, fdopen, loadFile, open } from "std"; ...@@ -259,15 +258,17 @@ import { evalScript, fdopen, loadFile, open } from "std";
switch (type) { switch (type) {
case "initPubsub": case "initPubsub":
initPubsub(configuration.numberOfDrones, if (initPubsub(configuration.numberOfDrones,
configuration.numberOfSubscribers); configuration.numberOfSubscribers
) !== 0) {
throw new Error("Failed to initialize PubSub.");
}
for (peer_id = 0; peer_id < configuration.numberOfDrones + configuration.numberOfSubscribers; peer_id++) { for (peer_id = 0; peer_id < configuration.numberOfDrones + configuration.numberOfSubscribers; peer_id++) {
peer_dict[peer_id] = new Drone(peer_id); peer_dict[peer_id] = new Drone(peer_id);
peer_dict[peer_id].init(peer_id); peer_dict[peer_id].init(peer_id);
if (peer_id < configuration.numberOfDrones) { if (peer_id < configuration.numberOfDrones) {
drone_dict[peer_id] = peer_dict[peer_id]; drone_dict[peer_id] = peer_dict[peer_id];
} }
last_message_timestamp_list[peer_id] = 0;
} }
parent.postMessage({type: "initialized"}); parent.postMessage({type: "initialized"});
break; break;
...@@ -283,22 +284,21 @@ import { evalScript, fdopen, loadFile, open } from "std"; ...@@ -283,22 +284,21 @@ import { evalScript, fdopen, loadFile, open } from "std";
case "update": case "update":
Object.entries(peer_dict).forEach(function ([id, peer]) { Object.entries(peer_dict).forEach(function ([id, peer]) {
peer_message = peer.message; peer_message = peer.message;
if (peer_message.length > 0) { while (peer_message.length > 0) {
parsed_message = JSON.parse(peer_message); try {
while (parsed_message.timestamp !== last_message_timestamp_list[id]) { parsed_message = JSON.parse(peer_message);
parsed_message.message_list.forEach(function(message) { parsed_message.message_list.forEach(function(message) {
if (user_me.hasOwnProperty("onGetMsg") if (user_me.hasOwnProperty("onGetMsg")
&& [-1, user_me.id].includes(message.dest_id)) { && [-1, user_me.id].includes(message.dest_id)) {
user_me.onGetMsg(message.content); user_me.onGetMsg(message.content);
} }
}); });
} catch (error) {
last_message_timestamp_list[id] = parsed_message.timestamp; console.log("Failed to parse message", peer_message);
peer_message = peer.message; console.log(error);
if (peer_message.length > 0) { console.log(error.stack);
parsed_message = JSON.parse(peer_message);
}
} }
peer_message = peer.message;
} }
}); });
...@@ -307,10 +307,7 @@ import { evalScript, fdopen, loadFile, open } from "std"; ...@@ -307,10 +307,7 @@ import { evalScript, fdopen, loadFile, open } from "std";
user_me.onUpdate(evt.data.timestamp); user_me.onUpdate(evt.data.timestamp);
} }
setMessage(JSON.stringify({ setMessage(JSON.stringify({message_list: to_send_list}));
message_list: to_send_list,
timestamp: Date.now()
}));
to_send_list = []; to_send_list = [];
if (evt.data.timestamp - last_log_timestamp >= 1000) { if (evt.data.timestamp - last_log_timestamp >= 1000) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment