Commit 2f940dfa authored by Léo-Paul Géneau's avatar Léo-Paul Géneau 👾

Use mutex to manipulate queues

parent 7561aeda
...@@ -37,7 +37,9 @@ typedef struct { ...@@ -37,7 +37,9 @@ typedef struct {
UA_Float speed; UA_Float speed;
UA_Float climbRate; UA_Float climbRate;
StrQueue receiveMessageQueue; StrQueue receiveMessageQueue;
pthread_mutex_t messageMutex;
StrQueue receiveLogQueue; StrQueue receiveLogQueue;
pthread_mutex_t logMutex;
} JSDroneData; } JSDroneData;
typedef struct { typedef struct {
......
...@@ -107,15 +107,18 @@ static UA_Boolean pubsubExited = true; ...@@ -107,15 +107,18 @@ static UA_Boolean pubsubExited = true;
static UA_UInt32 nbDrone; static UA_UInt32 nbDrone;
static UA_UInt32 nbSubscriber; static UA_UInt32 nbSubscriber;
static JSValueConst *droneObjectIdList; static JSValueConst *droneObjectIdList;
static StrQueue sendMessageQueue = {
StrQueue sendMessageQueue = {
.head = NULL, .head = NULL,
.tail = NULL, .tail = NULL,
}; };
pthread_mutex_t messageMutex;
UA_String currentSendMessage; UA_String currentSendMessage;
static StrQueue sendLogQueue = { StrQueue sendLogQueue = {
.head = NULL, .head = NULL,
.tail = NULL, .tail = NULL,
}; };
pthread_mutex_t logMutex;
UA_String currentSendLog; UA_String currentSendLog;
bool isADrone; bool isADrone;
...@@ -257,19 +260,22 @@ static JSValue js_drone_init(JSContext *ctx, JSValueConst thisVal, ...@@ -257,19 +260,22 @@ static JSValue js_drone_init(JSContext *ctx, JSValueConst thisVal,
{ {
int nb; int nb;
JSDroneData *s = (JSDroneData *) JS_GetOpaque2(ctx, thisVal, jsDroneClassId); JSDroneData *s = (JSDroneData *) JS_GetOpaque2(ctx, thisVal, jsDroneClassId);
if (!s) if (!s)
return JS_EXCEPTION; return JS_EXCEPTION;
if (JS_ToInt32(ctx, &nb, argv[0])) if (JS_ToInt32(ctx, &nb, argv[0]))
return JS_EXCEPTION; return JS_EXCEPTION;
droneObjectIdList[nb] = thisVal; droneObjectIdList[nb] = thisVal;
return JS_UNDEFINED; return JS_UNDEFINED;
} }
static JSValue readDroneDataStr(JSContext *ctx, StrQueue *pQueue, bool keepAtLeastAnElement) static JSValue getStrFromQueue(JSContext *ctx, StrQueue *pQueue, pthread_mutex_t *mutex)
{ {
JSValue res; JSValue res;
struct strNode *current; struct strNode *current;
pthread_mutex_lock(mutex);
current = pQueue->head; current = pQueue->head;
if (current != NULL) { if (current != NULL) {
res = JS_NewString(ctx, current->str); res = JS_NewString(ctx, current->str);
...@@ -277,14 +283,13 @@ static JSValue readDroneDataStr(JSContext *ctx, StrQueue *pQueue, bool keepAtLea ...@@ -277,14 +283,13 @@ static JSValue readDroneDataStr(JSContext *ctx, StrQueue *pQueue, bool keepAtLea
pQueue->head = current->next; pQueue->head = current->next;
delete_str_node(current); delete_str_node(current);
} else { } else {
if (!keepAtLeastAnElement) {
pQueue->head = (pQueue->tail = NULL); pQueue->head = (pQueue->tail = NULL);
delete_str_node(current); delete_str_node(current);
} }
}
} else { } else {
res = JS_NewString(ctx, ""); res = JS_NewString(ctx, "");
} }
pthread_mutex_unlock(mutex);
return res; return res;
} }
...@@ -311,9 +316,9 @@ static JSValue js_drone_get(JSContext *ctx, JSValueConst thisVal, int magic) ...@@ -311,9 +316,9 @@ static JSValue js_drone_get(JSContext *ctx, JSValueConst thisVal, int magic)
case 7: case 7:
return JS_NewFloat64(ctx, s->climbRate); return JS_NewFloat64(ctx, s->climbRate);
case 8: case 8:
return readDroneDataStr(ctx, &(s->receiveMessageQueue), true); return getStrFromQueue(ctx, &(s->receiveMessageQueue), &(s->messageMutex));
case 9: case 9:
return readDroneDataStr(ctx, &(s->receiveLogQueue), false); return getStrFromQueue(ctx, &(s->receiveLogQueue), &(s->logMutex));
case 10: case 10:
return JS_NewInt64(ctx, s->timestamp); return JS_NewInt64(ctx, s->timestamp);
default: default:
...@@ -321,32 +326,35 @@ static JSValue js_drone_get(JSContext *ctx, JSValueConst thisVal, int magic) ...@@ -321,32 +326,35 @@ static JSValue js_drone_get(JSContext *ctx, JSValueConst thisVal, int magic)
} }
} }
static void addStrToQueue(const char *str, StrQueue *pQueue) static void addStrToQueue(const char *str, StrQueue *pQueue, pthread_mutex_t *mutex)
{ {
struct strNode *newNode; struct strNode *newNode;
newNode = (struct strNode*)malloc(sizeof(struct strNode)); newNode = (struct strNode*)malloc(sizeof(struct strNode));
newNode->str = strdup(str); newNode->str = strdup(str);
newNode->next = NULL; newNode->next = NULL;
pthread_mutex_lock(mutex);
if (pQueue->tail == NULL) { if (pQueue->tail == NULL) {
pQueue->head = pQueue->tail = newNode; pQueue->head = pQueue->tail = newNode;
} else { } else {
pQueue->tail->next = newNode; pQueue->tail->next = newNode;
pQueue->tail = newNode; pQueue->tail = newNode;
} }
pthread_mutex_unlock(mutex);
} }
static JSValue add_jsstr_to_queue(JSContext *ctx, JSValueConst jsStr, static JSValue add_jsstr_to_queue(JSContext *ctx, JSValueConst jsStr,
StrQueue *pQueue) StrQueue *pQueue, pthread_mutex_t *mutex)
{ {
const char *str; const char *str;
str = JS_ToCString(ctx, jsStr); str = JS_ToCString(ctx, jsStr);
if (strlen(str) > MAX_STR_SIZE) { if (strlen(str) > MAX_STR_SIZE) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "String too long"); UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "String too long");
JS_FreeCString(ctx, str);
return JS_EXCEPTION; return JS_EXCEPTION;
} }
addStrToQueue(str, pQueue); addStrToQueue(str, pQueue, mutex);
JS_FreeCString(ctx, str); JS_FreeCString(ctx, str);
return JS_UNDEFINED; return JS_UNDEFINED;
} }
...@@ -354,13 +362,13 @@ static JSValue add_jsstr_to_queue(JSContext *ctx, JSValueConst jsStr, ...@@ -354,13 +362,13 @@ static JSValue add_jsstr_to_queue(JSContext *ctx, JSValueConst jsStr,
static JSValue js_drone_set_message(JSContext *ctx, JSValueConst thisVal, static JSValue js_drone_set_message(JSContext *ctx, JSValueConst thisVal,
int argc, JSValueConst *argv) int argc, JSValueConst *argv)
{ {
return add_jsstr_to_queue(ctx, argv[0], &sendMessageQueue); return add_jsstr_to_queue(ctx, argv[0], &sendMessageQueue, &messageMutex);
} }
static JSValue js_drone_set_log(JSContext *ctx, JSValueConst thisVal, static JSValue js_drone_set_log(JSContext *ctx, JSValueConst thisVal,
int argc, JSValueConst *argv) int argc, JSValueConst *argv)
{ {
return add_jsstr_to_queue(ctx, argv[0], &sendLogQueue); return add_jsstr_to_queue(ctx, argv[0], &sendLogQueue, &logMutex);
} }
static UA_Boolean UA_String_isEmpty(const UA_String *s) { static UA_Boolean UA_String_isEmpty(const UA_String *s) {
...@@ -372,28 +380,32 @@ static void clear_str(UA_String *pstr) { ...@@ -372,28 +380,32 @@ static void clear_str(UA_String *pstr) {
UA_String_clear(pstr); UA_String_clear(pstr);
} }
static UA_String get_StrQueue_content(StrQueue *pQueue, UA_String currentStr) static UA_String get_ua_str_from_queue(StrQueue *pQueue, pthread_mutex_t *mutex, UA_String currentStr)
{ {
struct strNode *current; struct strNode *current;
clear_str(&currentStr);
current = pQueue->head; current = pQueue->head;
pthread_mutex_lock(mutex);
if (current != NULL) { if (current != NULL) {
clear_str(&currentStr);
currentStr = UA_STRING_ALLOC(current->str); currentStr = UA_STRING_ALLOC(current->str);
pQueue->head = current->next == NULL ? (pQueue->tail = NULL) : current->next; pQueue->head = current->next == NULL ? (pQueue->tail = NULL) : current->next;
delete_str_node(current); delete_str_node(current);
} else {
currentStr = UA_STRING("");
} }
pthread_mutex_unlock(mutex);
return currentStr; return currentStr;
} }
UA_String get_message(void) UA_String get_message(void)
{ {
return get_StrQueue_content(&sendMessageQueue, currentSendMessage); return get_ua_str_from_queue(&sendMessageQueue, &messageMutex, currentSendMessage);
} }
UA_String get_log(void) UA_String get_log(void)
{ {
return get_StrQueue_content(&sendLogQueue, currentSendLog); return get_ua_str_from_queue(&sendLogQueue, &logMutex, currentSendLog);
} }
static JSClassDef jsDroneClass = { static JSClassDef jsDroneClass = {
...@@ -482,16 +494,17 @@ VariableData pubsub_get_value(UA_String identifier) { ...@@ -482,16 +494,17 @@ VariableData pubsub_get_value(UA_String identifier) {
return varDetails; return varDetails;
} }
static void setDroneDataStr(void *data, StrQueue* pQueue) static void setDroneDataStr(void *data, StrQueue* pQueue, pthread_mutex_t *mutex)
{ {
UA_String uaStr = *(UA_String*) data; UA_String uaStr = *(UA_String*) data;
char str[MAX_STR_SIZE]; char str[MAX_STR_SIZE];
memcpy(str, uaStr.data, uaStr.length); memcpy(str, uaStr.data, uaStr.length);
str[uaStr.length] = '\0'; str[uaStr.length] = '\0';
addStrToQueue(str, pQueue); addStrToQueue(str, pQueue, mutex);
} }
static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool print) static void pubsub_update_variables(UA_UInt32 id,
const UA_DataValue *var, bool print)
{ {
JSDroneData* s; JSDroneData* s;
UA_Int64* updatedPositionArray; UA_Int64* updatedPositionArray;
...@@ -526,11 +539,11 @@ static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool ...@@ -526,11 +539,11 @@ static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool
} }
return; return;
} else if (pubsubIdsArray[i].message == id) { } else if (pubsubIdsArray[i].message == id) {
setDroneDataStr(var->value.data, &(s->receiveMessageQueue)); setDroneDataStr(var->value.data, &(s->receiveMessageQueue), &(s->messageMutex));
return; return;
} else if (pubsubIdsArray[i].log == id) { } else if (pubsubIdsArray[i].log == id) {
if (!isADrone) { if (!isADrone) {
setDroneDataStr(var->value.data, &(s->receiveLogQueue)); setDroneDataStr(var->value.data, &(s->receiveLogQueue), &(s->logMutex));
} }
return; return;
} }
...@@ -641,7 +654,6 @@ static JSValue js_stop_pubsub(JSContext *ctx, JSValueConst thisVal, ...@@ -641,7 +654,6 @@ static JSValue js_stop_pubsub(JSContext *ctx, JSValueConst thisVal,
while(!pubsubExited) while(!pubsubExited)
sleep(1); sleep(1);
free(droneObjectIdList); free(droneObjectIdList);
cleanQueue(&sendMessageQueue); cleanQueue(&sendMessageQueue);
clear_str(&currentSendMessage); clear_str(&currentSendMessage);
cleanQueue(&sendLogQueue); cleanQueue(&sendLogQueue);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment