Commit 039a7687 authored by Léo-Paul Géneau's avatar Léo-Paul Géneau 👾

component/qjs-wrapper: use queues with mutex

parent 396f115f
......@@ -9,7 +9,7 @@ parts = qjs-wrapper
[qjs-wrapper-source]
recipe = slapos.recipe.build:gitclone
repository = https://lab.nexedi.com/nexedi/qjs-wrapper.git
revision = e90dc40
revision = 2f940df
git-executable = ${git:location}/bin/git
[qjs-wrapper]
......
......@@ -46,4 +46,4 @@ md5sum = 3bbb0f80b644d86784aab99b03e88c2f
[worker]
_update_hash_filename_ = drone-scripts/worker.js.jinja2
md5sum = 6cde256965271f89c6e82f04e3c0ce5c
md5sum = be8b4f7daeb325549a48f4e98461ee19
......@@ -71,7 +71,6 @@ import { evalScript, fdopen, loadFile, open } from "std";
gwsocket_w_pipe_fd,
handleWebSocketMessage,
last_log_timestamp = 0,
last_message_timestamp_list = [],
parent = Worker.parent,
peer_dict = {},
to_send_list = [],
......@@ -259,15 +258,17 @@ import { evalScript, fdopen, loadFile, open } from "std";
switch (type) {
case "initPubsub":
initPubsub(configuration.numberOfDrones,
configuration.numberOfSubscribers);
if (initPubsub(configuration.numberOfDrones,
configuration.numberOfSubscribers
) !== 0) {
throw new Error("Failed to initialize PubSub.");
}
for (peer_id = 0; peer_id < configuration.numberOfDrones + configuration.numberOfSubscribers; peer_id++) {
peer_dict[peer_id] = new Drone(peer_id);
peer_dict[peer_id].init(peer_id);
if (peer_id < configuration.numberOfDrones) {
drone_dict[peer_id] = peer_dict[peer_id];
}
last_message_timestamp_list[peer_id] = 0;
}
parent.postMessage({type: "initialized"});
break;
......@@ -283,22 +284,21 @@ import { evalScript, fdopen, loadFile, open } from "std";
case "update":
Object.entries(peer_dict).forEach(function ([id, peer]) {
peer_message = peer.message;
if (peer_message.length > 0) {
parsed_message = JSON.parse(peer_message);
while (parsed_message.timestamp !== last_message_timestamp_list[id]) {
while (peer_message.length > 0) {
try {
parsed_message = JSON.parse(peer_message);
parsed_message.message_list.forEach(function(message) {
if (user_me.hasOwnProperty("onGetMsg")
&& [-1, user_me.id].includes(message.dest_id)) {
user_me.onGetMsg(message.content);
}
});
last_message_timestamp_list[id] = parsed_message.timestamp;
peer_message = peer.message;
if (peer_message.length > 0) {
parsed_message = JSON.parse(peer_message);
}
} catch (error) {
console.log("Failed to parse message", peer_message);
console.log(error);
console.log(error.stack);
}
peer_message = peer.message;
}
});
......@@ -307,10 +307,7 @@ import { evalScript, fdopen, loadFile, open } from "std";
user_me.onUpdate(evt.data.timestamp);
}
setMessage(JSON.stringify({
message_list: to_send_list,
timestamp: Date.now()
}));
setMessage(JSON.stringify({message_list: to_send_list}));
to_send_list = [];
if (evt.data.timestamp - last_log_timestamp >= 1000) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment