Commit bf812553 authored by Léo-Paul Géneau's avatar Léo-Paul Géneau 👾

Allow subscribers to send messages

parent 62ab1198
...@@ -5,6 +5,8 @@ ...@@ -5,6 +5,8 @@
#include "autopilot_wrapper.h" #include "autopilot_wrapper.h"
#include "pubsub.h" #include "pubsub.h"
#define MAX_VARIABLE_NB 3
struct messageNode { struct messageNode {
char *message; char *message;
struct messageNode *next; struct messageNode *next;
...@@ -63,4 +65,28 @@ VariableData droneVariableArray[] = { ...@@ -63,4 +65,28 @@ VariableData droneVariableArray[] = {
}, },
}; };
VariableStruct droneVariables = {
.nbVariable = 3,
.variableArray = droneVariableArray,
};
VariableData subscriberVariableArray[] = {
{
.name = "message",
.description = "Message to send to the other drones",
.value = &message,
.type = UA_TYPES_STRING,
.builtInType = UA_NS0ID_STRING,
.valueRank = UA_VALUERANK_SCALAR,
.arrayDimensionsSize = 0,
.arrayDimensions = NULL,
.getter.getString = get_message,
},
};
VariableStruct subscriberVariables = {
.nbVariable = 1,
.variableArray = subscriberVariableArray,
};
#endif /* __DRONEDGE_H__ */ #endif /* __DRONEDGE_H__ */
...@@ -49,21 +49,33 @@ typedef struct { ...@@ -49,21 +49,33 @@ typedef struct {
} getter; } getter;
} VariableData; } VariableData;
typedef struct {
size_t nbVariable;
VariableData *variableArray;
} VariableStruct;
typedef struct {
VariableStruct variables;
void (*init_node_id)(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic);
} InstanceData;
int runPubsub(UA_String *transportProfile, int runPubsub(UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl, UA_NetworkAddressUrlDataType *networkAddressUrl,
VariableData *variableArray, size_t nbVariable, VariableStruct variables, UA_UInt32 id,
UA_UInt32 id, UA_UInt32 nbReader, UA_Duration interval, InstanceData *readerArray, UA_UInt32 nbReader,
void (*init_node_id)(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic), UA_UInt32 maxVariableNb, UA_Duration interval,
UA_UInt16 (*get_reader_id)(UA_UInt32 nb), UA_UInt16 (*get_reader_id)(UA_UInt32 nb),
VariableData (*get_value)(UA_String identifier), VariableData (*get_value)(UA_String identifier),
void (*update)(UA_UInt32 id, const UA_DataValue*, bool print), void (*update)(UA_UInt32 id, const UA_DataValue*, bool print),
bool publish, UA_Boolean *running); UA_Boolean *running);
UA_String get_message(void); UA_String get_message(void);
UA_UInt16 get_drone_id(UA_UInt32 nb); UA_UInt16 get_drone_id(UA_UInt32 nb);
void init_node_id(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic); void init_drone_node_id(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic);
void init_subscriber_node_id(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic);
VariableData pubsub_get_value(UA_String identifier); VariableData pubsub_get_value(UA_String identifier);
......
...@@ -10,14 +10,12 @@ UA_NodeId connectionIdent, publishedDataSetIdent, writerGroupIdent, ...@@ -10,14 +10,12 @@ UA_NodeId connectionIdent, publishedDataSetIdent, writerGroupIdent,
readerGroupIdent, readerIdent; readerGroupIdent, readerIdent;
UA_DataSetReaderConfig readerConfig; UA_DataSetReaderConfig readerConfig;
bool isPublisher;
VariableData (*pubsubGetValue)(UA_String identifier); VariableData (*pubsubGetValue)(UA_String identifier);
static void (*callbackUpdate)(UA_UInt32, const UA_DataValue*, bool print); static void (*callbackUpdate)(UA_UInt32, const UA_DataValue*, bool print);
static void fillDataSetMetaData(UA_DataSetMetaDataType *pMetaData, static void fillDataSetMetaData(UA_DataSetMetaDataType *pMetaData,
VariableData *variableArray, VariableStruct variables, int id);
size_t nbVariable, int id);
static UA_StatusCode static UA_StatusCode
addPubSubConnection(UA_Server *server, UA_String *transportProfile, addPubSubConnection(UA_Server *server, UA_String *transportProfile,
...@@ -234,14 +232,12 @@ addReaderGroup(UA_Server *server) { ...@@ -234,14 +232,12 @@ addReaderGroup(UA_Server *server) {
* SubscribedDataSet and be contained within a ReaderGroup. */ * SubscribedDataSet and be contained within a ReaderGroup. */
/* Add DataSetReader to the ReaderGroup */ /* Add DataSetReader to the ReaderGroup */
static UA_StatusCode static UA_StatusCode
addDataSetReader(UA_Server *server, VariableData *variableArray, addDataSetReader(UA_Server *server, VariableStruct variables, int id) {
size_t nbVariable, int id) {
if(server == NULL) if(server == NULL)
return UA_STATUSCODE_BADINTERNALERROR; return UA_STATUSCODE_BADINTERNALERROR;
/* Setting up Meta data configuration in DataSetReader */ /* Setting up Meta data configuration in DataSetReader */
fillDataSetMetaData(&readerConfig.dataSetMetaData, variableArray, fillDataSetMetaData(&readerConfig.dataSetMetaData, variables, id);
nbVariable, id);
return UA_Server_addDataSetReader(server, readerGroupIdent, return UA_Server_addDataSetReader(server, readerGroupIdent,
&readerConfig, &readerIdent); &readerConfig, &readerIdent);
...@@ -252,7 +248,7 @@ dataChangeNotificationCallback(UA_Server *server, UA_UInt32 monitoredItemId, ...@@ -252,7 +248,7 @@ dataChangeNotificationCallback(UA_Server *server, UA_UInt32 monitoredItemId,
void *monitoredItemContext, const UA_NodeId *nodeId, void *monitoredItemContext, const UA_NodeId *nodeId,
void *nodeContext, UA_UInt32 attributeId, void *nodeContext, UA_UInt32 attributeId,
const UA_DataValue *var) { const UA_DataValue *var) {
callbackUpdate(nodeId->identifier.numeric, var, !isPublisher); callbackUpdate(nodeId->identifier.numeric, var, false);
} }
/** /**
...@@ -263,7 +259,7 @@ dataChangeNotificationCallback(UA_Server *server, UA_UInt32 monitoredItemId, ...@@ -263,7 +259,7 @@ dataChangeNotificationCallback(UA_Server *server, UA_UInt32 monitoredItemId,
static UA_StatusCode static UA_StatusCode
addSubscribedVariables(UA_Server *server, UA_NodeId dataSetReaderId, addSubscribedVariables(UA_Server *server, UA_NodeId dataSetReaderId,
VariableData *variableArray, UA_UInt32 nb, VariableData *variableArray, UA_UInt32 nb,
UA_Duration samplingInterval, UA_UInt32 maxVariableNb, UA_Duration samplingInterval,
void (*init_node_id)(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic)) { void (*init_node_id)(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic)) {
if(server == NULL) if(server == NULL)
return UA_STATUSCODE_BADINTERNALERROR; return UA_STATUSCODE_BADINTERNALERROR;
...@@ -312,7 +308,7 @@ addSubscribedVariables(UA_Server *server, UA_NodeId dataSetReaderId, ...@@ -312,7 +308,7 @@ addSubscribedVariables(UA_Server *server, UA_NodeId dataSetReaderId,
UA_NodeId newNode; UA_NodeId newNode;
retval |= UA_Server_addVariableNode(server, retval |= UA_Server_addVariableNode(server,
UA_NODEID_NUMERIC(1, (UA_UInt32) readerConfig.dataSetMetaData.fieldsSize*nb + i + 50000), UA_NODEID_NUMERIC(1, (UA_UInt32)maxVariableNb*nb + i + 50000),
folderId, folderId,
UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT),
UA_QUALIFIEDNAME(1, (char *)readerConfig.dataSetMetaData.fields[i].name.data), UA_QUALIFIEDNAME(1, (char *)readerConfig.dataSetMetaData.fields[i].name.data),
...@@ -351,8 +347,7 @@ addSubscribedVariables(UA_Server *server, UA_NodeId dataSetReaderId, ...@@ -351,8 +347,7 @@ addSubscribedVariables(UA_Server *server, UA_NodeId dataSetReaderId,
* and PublishedDataSetFields of Publisher */ * and PublishedDataSetFields of Publisher */
/* Define MetaData for TargetVariables */ /* Define MetaData for TargetVariables */
static void fillDataSetMetaData(UA_DataSetMetaDataType *pMetaData, static void fillDataSetMetaData(UA_DataSetMetaDataType *pMetaData,
VariableData *variableArray, VariableStruct variables, int id) {
size_t nbVariable, int id) {
char name[12]; char name[12];
UA_snprintf(name, sizeof(name) - 1, "DataSet %d", id); UA_snprintf(name, sizeof(name) - 1, "DataSet %d", id);
...@@ -364,7 +359,7 @@ static void fillDataSetMetaData(UA_DataSetMetaDataType *pMetaData, ...@@ -364,7 +359,7 @@ static void fillDataSetMetaData(UA_DataSetMetaDataType *pMetaData,
/* Definition of number of fields sizeto create different /* Definition of number of fields sizeto create different
* targetVariables of distinct datatype */ * targetVariables of distinct datatype */
pMetaData->fieldsSize = nbVariable; pMetaData->fieldsSize = variables.nbVariable;
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "fieldsSize %d", (int) pMetaData->fieldsSize); UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "fieldsSize %d", (int) pMetaData->fieldsSize);
pMetaData->fields = (UA_FieldMetaData*)UA_Array_new (pMetaData->fieldsSize, pMetaData->fields = (UA_FieldMetaData*)UA_Array_new (pMetaData->fieldsSize,
&UA_TYPES[UA_TYPES_FIELDMETADATA]); &UA_TYPES[UA_TYPES_FIELDMETADATA]);
...@@ -372,26 +367,26 @@ static void fillDataSetMetaData(UA_DataSetMetaDataType *pMetaData, ...@@ -372,26 +367,26 @@ static void fillDataSetMetaData(UA_DataSetMetaDataType *pMetaData,
for(size_t i = 0; i < pMetaData->fieldsSize; i++) { for(size_t i = 0; i < pMetaData->fieldsSize; i++) {
UA_FieldMetaData_init (&pMetaData->fields[i]); UA_FieldMetaData_init (&pMetaData->fields[i]);
UA_NodeId_copy(&UA_TYPES[variableArray[i].type].typeId, &pMetaData->fields[i].dataType); UA_NodeId_copy(&UA_TYPES[variables.variableArray[i].type].typeId, &pMetaData->fields[i].dataType);
pMetaData->fields[i].builtInType = variableArray[i].builtInType; pMetaData->fields[i].builtInType = variables.variableArray[i].builtInType;
pMetaData->fields[i].name = UA_STRING (variableArray[i].name); pMetaData->fields[i].name = UA_STRING (variables.variableArray[i].name);
pMetaData->fields[i].valueRank = variableArray[i].valueRank; pMetaData->fields[i].valueRank = variables.variableArray[i].valueRank;
pMetaData->fields[i].arrayDimensions = variableArray[i].arrayDimensions; pMetaData->fields[i].arrayDimensions = variables.variableArray[i].arrayDimensions;
pMetaData->fields[i].arrayDimensionsSize = variableArray[i].arrayDimensionsSize; pMetaData->fields[i].arrayDimensionsSize = variables.variableArray[i].arrayDimensionsSize;
} }
} }
static void static void
setVariableType(UA_Server *server, VariableData *variableArray, size_t nbVariable) { setVariableType(UA_Server *server, VariableStruct variables) {
VariableData vDetails; VariableData vDetails;
UA_VariableTypeAttributes vtAttr; UA_VariableTypeAttributes vtAttr;
for(UA_UInt32 i = 0; i < nbVariable; i++) { for(UA_UInt32 i = 0; i < variables.nbVariable; i++) {
vDetails = variableArray[i]; vDetails = variables.variableArray[i];
switch(vDetails.valueRank) { switch(vDetails.valueRank) {
case UA_VALUERANK_SCALAR: case UA_VALUERANK_SCALAR:
variableArray[i].typeNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE); vDetails.typeNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE);
break; break;
case UA_VALUERANK_ONE_DIMENSION: case UA_VALUERANK_ONE_DIMENSION:
...@@ -433,10 +428,8 @@ setServer(UA_String *transportProfile, ...@@ -433,10 +428,8 @@ setServer(UA_String *transportProfile,
} }
static UA_StatusCode static UA_StatusCode
subscribe(UA_Server *server, subscribe(UA_Server *server, InstanceData *instanceArray, UA_UInt32 id,
VariableData *variableArray, size_t nbVariable, UA_UInt32 nbReader, UA_UInt32 maxVariableNb, UA_Duration interval,
UA_UInt32 id, UA_UInt32 nbReader, UA_Duration interval,
void (*init_node_id)(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic),
UA_UInt16 (*get_reader_id)(UA_UInt32 nb), UA_UInt16 (*get_reader_id)(UA_UInt32 nb),
void (*update)(UA_UInt32 id, const UA_DataValue*, bool print)) { void (*update)(UA_UInt32 id, const UA_DataValue*, bool print)) {
UA_UInt16 publisherIdent; UA_UInt16 publisherIdent;
...@@ -460,12 +453,16 @@ subscribe(UA_Server *server, ...@@ -460,12 +453,16 @@ subscribe(UA_Server *server,
readerConfig.name = UA_STRING(readerName); readerConfig.name = UA_STRING(readerName);
readerConfig.publisherId.data = &publisherIdent; readerConfig.publisherId.data = &publisherIdent;
retval = addDataSetReader(server, variableArray, nbVariable, publisherIdent); retval = addDataSetReader(server, instanceArray[i].variables,
publisherIdent);
if (retval != UA_STATUSCODE_GOOD) if (retval != UA_STATUSCODE_GOOD)
return EXIT_FAILURE; return EXIT_FAILURE;
/* Add SubscribedVariables to the created DataSetReader */ /* Add SubscribedVariables to the created DataSetReader */
retval = addSubscribedVariables(server, readerIdent, variableArray, i, interval, init_node_id); retval = addSubscribedVariables(server, readerIdent,
instanceArray[i].variables.variableArray,
i, maxVariableNb, interval,
instanceArray[i].init_node_id);
if (retval != UA_STATUSCODE_GOOD) if (retval != UA_STATUSCODE_GOOD)
return EXIT_FAILURE; return EXIT_FAILURE;
} }
...@@ -475,43 +472,40 @@ subscribe(UA_Server *server, ...@@ -475,43 +472,40 @@ subscribe(UA_Server *server,
int runPubsub(UA_String *transportProfile, int runPubsub(UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl, UA_NetworkAddressUrlDataType *networkAddressUrl,
VariableData *variableArray, size_t nbVariable, VariableStruct variables, UA_UInt32 id,
UA_UInt32 id, UA_UInt32 nbReader, UA_Duration interval, InstanceData *readerArray, UA_UInt32 nbReader,
void (*init_node_id)(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic), UA_UInt32 maxVariableNb, UA_Duration interval,
UA_UInt16 (*get_reader_id)(UA_UInt32 nb), UA_UInt16 (*get_reader_id)(UA_UInt32 nb),
VariableData (*get_value)(UA_String identifier), VariableData (*get_value)(UA_String identifier),
void (*update)(UA_UInt32 id, const UA_DataValue*, bool print), void (*update)(UA_UInt32 id, const UA_DataValue*, bool print),
bool publish, UA_Boolean *running) { UA_Boolean *running) {
UA_Server *server; UA_Server *server;
UA_StatusCode retval; UA_StatusCode retval;
server = setServer(transportProfile, networkAddressUrl, id); server = setServer(transportProfile, networkAddressUrl, id);
setVariableType(server, variableArray, nbVariable); setVariableType(server, variables);
/* Publishing */ /* Publishing */
isPublisher = publish; pubsubGetValue = get_value;
if (isPublisher) {
pubsubGetValue = get_value;
addPublishedDataSet(server, id); addPublishedDataSet(server, id);
for(UA_UInt32 i = 0; i < nbVariable; i++) { for(UA_UInt32 i = 0; i < variables.nbVariable; i++) {
retval = addDataSourceVariable(server, variableArray[i]); retval = addDataSourceVariable(server, variables.variableArray[i]);
if (retval != UA_STATUSCODE_GOOD)
return EXIT_FAILURE;
addDataSetField(server, variableArray[i]);
}
addWriterGroup(server, interval);
retval = addDataSetWriter(server);
if (retval != UA_STATUSCODE_GOOD) if (retval != UA_STATUSCODE_GOOD)
return EXIT_FAILURE; return EXIT_FAILURE;
addDataSetField(server, variables.variableArray[i]);
} }
addWriterGroup(server, interval);
retval = addDataSetWriter(server);
if (retval != UA_STATUSCODE_GOOD)
return EXIT_FAILURE;
/* Subscribing */ /* Subscribing */
subscribe(server, variableArray, nbVariable, id, nbReader, interval, subscribe(server, readerArray, id, nbReader, maxVariableNb, interval,
init_node_id, get_reader_id, update); get_reader_id, update);
retval = UA_Server_run(server, running); retval = UA_Server_run(server, running);
UA_Server_delete(server); UA_Server_delete(server);
......
...@@ -7,6 +7,7 @@ static UA_Boolean pubsubShouldRun = true; ...@@ -7,6 +7,7 @@ static UA_Boolean pubsubShouldRun = true;
static UA_Boolean pubsubExited = false; static UA_Boolean pubsubExited = false;
static UA_UInt32 nbDrone; static UA_UInt32 nbDrone;
static UA_UInt32 nbSubscriber;
static JSValueConst *droneObjectIdList; static JSValueConst *droneObjectIdList;
static MessageQueue messageQueue = { static MessageQueue messageQueue = {
.head = NULL, .head = NULL,
...@@ -188,15 +189,15 @@ VariableData pubsub_get_value(UA_String identifier) { ...@@ -188,15 +189,15 @@ VariableData pubsub_get_value(UA_String identifier) {
UA_DataType type; UA_DataType type;
UA_Double *array; UA_Double *array;
for(UA_UInt32 i = 0; i < countof(droneVariableArray); i++) { for(UA_UInt32 i = 0; i < droneVariables.nbVariable; i++) {
UA_String name = UA_STRING(droneVariableArray[i].name); UA_String name = UA_STRING(droneVariables.variableArray[i].name);
if(UA_String_equal(&identifier, &name)) { if(UA_String_equal(&identifier, &name)) {
varDetails = droneVariableArray[i]; varDetails = droneVariables.variableArray[i];
switch(varDetails.valueRank) { switch(varDetails.valueRank) {
case UA_VALUERANK_SCALAR: case UA_VALUERANK_SCALAR:
switch(varDetails.type) { switch(varDetails.type) {
case UA_TYPES_STRING: case UA_TYPES_STRING:
*(UA_String*)varDetails.value = droneVariableArray[i].getter.getString(); *(UA_String*)varDetails.value = varDetails.getter.getString();
break; break;
default: default:
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "UA_TYPE not handled"); UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "UA_TYPE not handled");
...@@ -207,7 +208,7 @@ VariableData pubsub_get_value(UA_String identifier) { ...@@ -207,7 +208,7 @@ VariableData pubsub_get_value(UA_String identifier) {
case UA_VALUERANK_ONE_DIMENSION: case UA_VALUERANK_ONE_DIMENSION:
type = UA_TYPES[varDetails.type]; type = UA_TYPES[varDetails.type];
size_t size = varDetails.arrayDimensions[0]; size_t size = varDetails.arrayDimensions[0];
array = (UA_Double *) droneVariableArray[i].getter.getArray(); array = (UA_Double *) varDetails.getter.getArray();
if(type.pointerFree) { if(type.pointerFree) {
memcpy(varDetails.value, array, type.memSize * size); memcpy(varDetails.value, array, type.memSize * size);
...@@ -237,7 +238,7 @@ VariableData pubsub_get_value(UA_String identifier) { ...@@ -237,7 +238,7 @@ VariableData pubsub_get_value(UA_String identifier) {
return varDetails; return varDetails;
} }
void init_node_id(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic) { void init_drone_node_id(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic) {
JSDroneData *s = (JSDroneData *) JS_GetOpaque(droneObjectIdList[nb], jsDroneClassId); JSDroneData *s = (JSDroneData *) JS_GetOpaque(droneObjectIdList[nb], jsDroneClassId);
switch(magic) { switch(magic) {
case 0: case 0:
...@@ -255,6 +256,18 @@ void init_node_id(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic) { ...@@ -255,6 +256,18 @@ void init_node_id(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic) {
} }
} }
void init_subscriber_node_id(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic) {
JSDroneData *s = (JSDroneData *) JS_GetOpaque(droneObjectIdList[nb], jsDroneClassId);
switch(magic) {
case 0:
s->messageId = id;
break;
default:
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Unknown variable id");
break;
}
}
static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool print) static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool print)
{ {
JSDroneData* s; JSDroneData* s;
...@@ -262,7 +275,7 @@ static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool ...@@ -262,7 +275,7 @@ static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool
UA_Double* positionArray; UA_Double* positionArray;
UA_Float* speedArray; UA_Float* speedArray;
for(UA_UInt32 i = 0; i < nbDrone; i++) { for(UA_UInt32 i = 0; i < nbDrone + nbSubscriber; i++) {
s = (JSDroneData *) JS_GetOpaque(droneObjectIdList[i], jsDroneClassId); s = (JSDroneData *) JS_GetOpaque(droneObjectIdList[i], jsDroneClassId);
if (s->positionArrayId == id) { if (s->positionArrayId == id) {
positionArray = (UA_Double*) var->value.data; positionArray = (UA_Double*) var->value.data;
...@@ -292,21 +305,14 @@ static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool ...@@ -292,21 +305,14 @@ static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool
} else if (s->messageId == id) { } else if (s->messageId == id) {
uaStr = *(UA_String*) var->value.data; uaStr = *(UA_String*) var->value.data;
if (!print) { pthread_mutex_lock(&mutex);
pthread_mutex_lock(&mutex); while(strlen(s->message) != 0)
while(strlen(s->message) != 0) pthread_cond_wait(&threadCond, &mutex);
pthread_cond_wait(&threadCond, &mutex);
}
memcpy(s->message, uaStr.data, uaStr.length); memcpy(s->message, uaStr.data, uaStr.length);
s->message[uaStr.length] = '\0'; s->message[uaStr.length] = '\0';
if (print) { pthread_mutex_unlock(&mutex);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT,
"Received message for drone %d: %s", s->id, s->message);
} else {
pthread_mutex_unlock(&mutex);
}
return; return;
} }
} }
...@@ -331,6 +337,10 @@ static JSValue js_run_pubsub(JSContext *ctx, JSValueConst this_val, ...@@ -331,6 +337,10 @@ static JSValue js_run_pubsub(JSContext *ctx, JSValueConst this_val,
char urlBuffer[44]; char urlBuffer[44];
UA_UInt32 id; UA_UInt32 id;
UA_Duration interval; UA_Duration interval;
bool isADrone;
VariableStruct variables;
InstanceData *instanceArray;
UA_UInt32 nbPeer = nbDrone + nbSubscriber;
int res; int res;
ipv6 = JS_ToCString(ctx, argv[0]); ipv6 = JS_ToCString(ctx, argv[0]);
...@@ -352,13 +362,26 @@ static JSValue js_run_pubsub(JSContext *ctx, JSValueConst this_val, ...@@ -352,13 +362,26 @@ static JSValue js_run_pubsub(JSContext *ctx, JSValueConst this_val,
if (JS_ToFloat64(ctx, &interval, argv[4])) if (JS_ToFloat64(ctx, &interval, argv[4]))
return JS_EXCEPTION; return JS_EXCEPTION;
res = runPubsub(&transportProfile, &networkAddressUrl, droneVariableArray, isADrone = JS_ToBool(ctx, argv[5]);
countof(droneVariableArray), id, nbDrone, interval, variables = isADrone ? droneVariables : subscriberVariables;
init_node_id, get_drone_id, pubsub_get_value,
pubsub_update_variables, JS_ToBool(ctx, argv[5]), instanceArray = (InstanceData *) malloc((nbPeer) * sizeof(InstanceData));
for(UA_UInt32 i = 0; i < nbDrone; i++) {
instanceArray[i].variables = droneVariables;
instanceArray[i].init_node_id = init_drone_node_id;
}
for(UA_UInt32 i = nbDrone; i < nbPeer; i++) {
instanceArray[i].variables = subscriberVariables;
instanceArray[i].init_node_id = init_subscriber_node_id;
}
res = runPubsub(&transportProfile, &networkAddressUrl, variables, id,
instanceArray, nbPeer, MAX_VARIABLE_NB, interval,
get_drone_id, pubsub_get_value, pubsub_update_variables,
&pubsubShouldRun); &pubsubShouldRun);
pubsubExited = true; pubsubExited = true;
free(instanceArray);
JS_FreeCString(ctx, ipv6); JS_FreeCString(ctx, ipv6);
JS_FreeCString(ctx, port); JS_FreeCString(ctx, port);
free(notConstNetIface); free(notConstNetIface);
...@@ -372,8 +395,11 @@ static JSValue js_init_pubsub(JSContext *ctx, JSValueConst thisVal, ...@@ -372,8 +395,11 @@ static JSValue js_init_pubsub(JSContext *ctx, JSValueConst thisVal,
if (JS_ToUint32(ctx, &nbDrone, argv[0])) if (JS_ToUint32(ctx, &nbDrone, argv[0]))
return JS_EXCEPTION; return JS_EXCEPTION;
if (JS_ToUint32(ctx, &nbSubscriber, argv[1]))
return JS_EXCEPTION;
currentMessage = UA_STRING(""); currentMessage = UA_STRING("");
droneObjectIdList = (JSValue *) malloc(nbDrone * sizeof(JSValueConst)); droneObjectIdList = (JSValue *) malloc((nbDrone + nbSubscriber) * sizeof(JSValueConst));
return JS_NewInt32(ctx, 0); return JS_NewInt32(ctx, 0);
} }
...@@ -393,6 +419,7 @@ static JSValue js_stop_pubsub(JSContext *ctx, JSValueConst thisVal, ...@@ -393,6 +419,7 @@ static JSValue js_stop_pubsub(JSContext *ctx, JSValueConst thisVal,
delete_message_node(current); delete_message_node(current);
} }
clear_message(currentMessage); clear_message(currentMessage);
return JS_NewInt32(ctx, 0); return JS_NewInt32(ctx, 0);
} }
...@@ -617,7 +644,7 @@ static const JSCFunctionListEntry js_funcs[] = { ...@@ -617,7 +644,7 @@ static const JSCFunctionListEntry js_funcs[] = {
JS_CFUNC_DEF("getAirspeed", 0, js_getSpeed ), JS_CFUNC_DEF("getAirspeed", 0, js_getSpeed ),
JS_CFUNC_DEF("getClimbRate", 0, js_getClimbRate ), JS_CFUNC_DEF("getClimbRate", 0, js_getClimbRate ),
JS_CFUNC_DEF("healthAllOk", 0, js_healthAllOk ), JS_CFUNC_DEF("healthAllOk", 0, js_healthAllOk ),
JS_CFUNC_DEF("initPubsub", 1, js_init_pubsub ), JS_CFUNC_DEF("initPubsub", 2, js_init_pubsub ),
}; };
static int js_init(JSContext *ctx, JSModuleDef *m) static int js_init(JSContext *ctx, JSModuleDef *m)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment