Commit eae230b6 authored by Ivan Tyagov's avatar Ivan Tyagov

Keep alive

See merge request !15
parents 3cd553d5 a46874c3
#!/bin/bash
rm -f server
# build OPC UA server which publishes msgs
gcc -o server -I /usr/local/include/ -std=c99 server.c -l:libopen62541.so -L/usr/local/lib -lmbedcrypto -lmbedx509 -I ~/open62541/src/pubsub/ -I ~/open62541/deps/
/* This work is licensed under a Creative Commons CCZero 1.0 Universal License. /* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
* See http://creativecommons.org/publicdomain/zero/1.0/for more information. */ * See http://creativecommons.org/publicdomain/zero/1.0/for more information. */
#define countof(a) (sizeof(a)/sizeof(*(a)))
#include <sys/time.h>
#include <stdio.h>
#include <open62541/server.h> #include <open62541/server.h>
int getMicroSeconds() {
struct timeval current_time;
gettimeofday(&current_time, NULL);
long int ms = current_time.tv_sec * 1000 + current_time.tv_usec / 1000;
return ms;
}
/* loadFile parses the certificate file. /* loadFile parses the certificate file.
* *
* @param path specifies the file name given in argv[] * @param path specifies the file name given in argv[]
...@@ -51,3 +63,82 @@ char *randomString(size_t length) ...@@ -51,3 +63,82 @@ char *randomString(size_t length)
} }
return randomString; return randomString;
} }
char *convertInt2Str(int my_int){
/* Convert integer to string */
int length = snprintf( NULL, 0, "%d", my_int);
char *my_str = malloc(length + 1);
snprintf(my_str, length + 1, "%d", my_int);
return my_str;
}
char *convertLongInt2Str(long int my_int){
/* Convert integer to string */
int length = snprintf( NULL, 0, "%ld", my_int);
char *my_str = malloc(length + 1);
snprintf(my_str, length + 1, "%ld", my_int);
return my_str;
}
// XXX: dictionary implementation based on https://gist.github.com/kylef/86784/fe97567ec9baf5c0dce3c7fcbec948e21dfcce09
typedef struct dict_t_struct {
char *key;
void *value;
struct dict_t_struct *next;
} dict_t;
dict_t **dictAlloc(void) {
return malloc(sizeof(dict_t));
}
void dictDealloc(dict_t **dict) {
free(dict);
}
void *getItem(dict_t *dict, char *key) {
dict_t *ptr;
for (ptr = dict; ptr != NULL; ptr = ptr->next) {
if (strcmp(ptr->key, key) == 0) {
return ptr->value;
}
}
return NULL;
}
void delItem(dict_t **dict, char *key) {
dict_t *ptr, *prev;
for (ptr = *dict, prev = NULL; ptr != NULL; prev = ptr, ptr = ptr->next) {
if (strcmp(ptr->key, key) == 0) {
if (ptr->next != NULL) {
if (prev == NULL) {
*dict = ptr->next;
} else {
prev->next = ptr->next;
}
} else if (prev != NULL) {
prev->next = NULL;
} else {
*dict = NULL;
}
free(ptr->key);
free(ptr);
return;
}
}
}
void addItem(dict_t **dict, char *key, void *value) {
delItem(dict, key); /* If we already have a item with this key, delete it. */
dict_t *d = malloc(sizeof(struct dict_t_struct));
d->key = malloc(strlen(key)+1);
strcpy(d->key, key);
d->value = value;
d->next = *dict;
*dict = d;
}
#!/bin/bash
./server -m 1 -b 1 -i 1 -l 2,3 -t 500
#!/bin/bash
./server -p 4841 -m 1 -b 1 -i 2 -l 1,3 -t 500
#!/bin/bash
./server -p 4842 -m 1 -b 1 -i 3 -l 1,2 -t 500
// global HEART BEATs of coupler
static unsigned int HEART_BEATS = 0;
// the heart beat interval$$
const int DEFAULT_HEART_BEAT_INTERVAL = 250;
static int HEART_BEAT_INTERVAL = DEFAULT_HEART_BEAT_INTERVAL;
// the list of couplers onto which we depend for properly running$
unsigned int HEART_BEAT_ID_LIST[] = {0, 0, 0, 0, 0, 0, 0, 0};
// the interval for publishing messages$
const int PUBLISHING_INTERVAL = 100;
// a hard coded writer group, data set and publisher ID$
// (should be same for publisher / subscriber)$
const int WRITER_GROUP_ID = 100;
const int DATASET_WRITER_ID = 62541;
const int PUBLISHER_ID = 2234;
/*
Keep alive implementation for couplers based on OPC UA's pub/sub mechanism
*/
#include "keep_alive.h"
UA_NodeId connectionIdent, publishedDataSetIdent, writerGroupIdent;
static void addPubSubConnection(UA_Server *server, UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl){
/* Details about the connection configuration and handling are located
* in the pubsub connection tutorial */
UA_PubSubConnectionConfig connectionConfig;
memset(&connectionConfig, 0, sizeof(connectionConfig));
connectionConfig.name = UA_STRING("UADP Connection 1");
connectionConfig.transportProfileUri = *transportProfile;
connectionConfig.enabled = UA_TRUE;
UA_Variant_setScalar(&connectionConfig.address, networkAddressUrl,
&UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
/* Changed to static publisherId from random generation to identify
* the publisher on Subscriber side */
connectionConfig.publisherId.numeric = PUBLISHER_ID;
UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
}
/**
* **PublishedDataSet handling**
*
* The PublishedDataSet (PDS) and PubSubConnection are the toplevel entities and
* can exist alone. The PDS contains the collection of the published fields. All
* other PubSub elements are directly or indirectly linked with the PDS or
* connection. */
static void addPublishedDataSet(UA_Server *server) {
/* The PublishedDataSetConfig contains all necessary public
* information for the creation of a new PublishedDataSet */
UA_PublishedDataSetConfig publishedDataSetConfig;
memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
publishedDataSetConfig.name = UA_STRING("Demo PDS");
/* Create new PublishedDataSet based on the PublishedDataSetConfig. */
UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent);
}
/**
* **WriterGroup handling**
*
* The WriterGroup (WG) is part of the connection and contains the primary
* configuration parameters for the message creation. */
static void addWriterGroup(UA_Server *server) {
/* Now we create a new WriterGroupConfig and add the group to the existing
* PubSubConnection. */
UA_WriterGroupConfig writerGroupConfig;
memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
writerGroupConfig.name = UA_STRING("Demo WriterGroup");
writerGroupConfig.publishingInterval = PUBLISHING_INTERVAL;
writerGroupConfig.enabled = UA_FALSE;
writerGroupConfig.writerGroupId = WRITER_GROUP_ID;
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
writerGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
/* The configuration flags for the messages are encapsulated inside the
* message- and transport settings extension objects. These extension
* objects are defined by the standard. e.g.
* UadpWriterGroupMessageDataType */
UA_UadpWriterGroupMessageDataType *writerGroupMessage = UA_UadpWriterGroupMessageDataType_new();
/* Change message settings of writerGroup to send PublisherId,
* WriterGroupId in GroupHeader and DataSetWriterId in PayloadHeader
* of NetworkMessage */
writerGroupMessage->networkMessageContentMask = (UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
writerGroupConfig.messageSettings.content.decoded.data = writerGroupMessage;
UA_Server_addWriterGroup(server, connectionIdent, &writerGroupConfig, &writerGroupIdent);
UA_Server_setWriterGroupOperational(server, writerGroupIdent);
UA_UadpWriterGroupMessageDataType_delete(writerGroupMessage);
}
/**
* **DataSetWriter handling**
*
* A DataSetWriter (DSW) is the glue between the WG and the PDS. The DSW is
* linked to exactly one PDS and contains additional information for the
* message generation. */
static void addDataSetWriter(UA_Server *server) {
/* We need now a DataSetWriter within the WriterGroup. This means we must
* create a new DataSetWriterConfig and add call the addWriterGroup function. */
UA_NodeId dataSetWriterIdent;
UA_DataSetWriterConfig dataSetWriterConfig;
memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
dataSetWriterConfig.dataSetWriterId = DATASET_WRITER_ID;
dataSetWriterConfig.keyFrameCount = 10;
UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
&dataSetWriterConfig, &dataSetWriterIdent);
}
typedef struct PublishedVariable {
char *name;
char *description;
void * UA_RESTRICT pdefaultValue;
int type;
} PublishedVariable;
static void addPubSubVariable(UA_Server *server, PublishedVariable varDetails) {
UA_VariableAttributes attr = UA_VariableAttributes_default;
UA_Variant_setScalar(&attr.value, varDetails.pdefaultValue, &UA_TYPES[varDetails.type]);
attr.description = UA_LOCALIZEDTEXT("en-US", varDetails.description);
attr.displayName = UA_LOCALIZEDTEXT("en-US", varDetails.description);
attr.dataType = UA_TYPES[varDetails.type].typeId;
attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
UA_Server_addVariableNode(server, UA_NODEID_STRING(1, varDetails.name),
UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES),
UA_QUALIFIEDNAME(1, varDetails.description),
UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE),
attr, NULL, NULL);
}
static void addPubSubDataSetField(UA_Server *server, PublishedVariable varDetails) {
UA_NodeId dataSetFieldIdent;
UA_DataSetFieldConfig dataSetFieldConfig;
memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING(varDetails.description);
dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
dataSetFieldConfig.field.variable.publishParameters.publishedVariable =
UA_NODEID_STRING(1, varDetails.name);
dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
UA_Server_addDataSetField(server, publishedDataSetIdent,
&dataSetFieldConfig, &dataSetFieldIdent);
}
void callbackTicHeartBeat()
{
/* Increase periodically heart beats of the server */
HEART_BEATS += 1;
//UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "heart_beat %d", HEART_BEATS);
// set OPC UA's heat_beat node value
UA_NodeId myFloatNodeId = UA_NODEID_STRING(1, "heart_beat");
UA_Float myFloat = HEART_BEATS;
UA_Variant myVar;
UA_Variant_init(&myVar);
UA_Variant_setScalar(&myVar, &myFloat, &UA_TYPES[UA_TYPES_FLOAT]);
UA_Server_writeValue(server, myFloatNodeId, myVar);
}
static void enablePublishHeartBeat(UA_Server *server, UA_ServerConfig *config){
int i;
// add a callback which will increment heart beat tics
UA_UInt64 callbackId = 1;
UA_Server_addRepeatedCallback(server, callbackTicHeartBeat, NULL, HEART_BEAT_INTERVAL, &callbackId);
UA_UInt32 defaultUInt32 = 0;
UA_UInt32 couplerID = COUPLER_ID;
UA_Float defaultFloat = 0.0;
const PublishedVariable publishedVariableArray[] = {
// representing time in millis since start of process
{
.name = "heart_beat",
.description = "Heart beat",
.pdefaultValue = &defaultFloat,
.type = UA_TYPES_FLOAT
},
// representing the ID of the coupler
{
.name = "id",
.description = "ID",
.pdefaultValue = &couplerID,
.type = UA_TYPES_UINT32
}
};
UA_String transportProfile = UA_STRING(DEFAULT_TRANSPORT_PROFILE);
UA_NetworkAddressUrlDataType networkAddressUrl =
{UA_STRING_NULL , UA_STRING(DEFAULT_NETWORK_ADDRESS_URL)};
addPubSubConnection(server, &transportProfile, &networkAddressUrl);
addPublishedDataSet(server);
for(i = 0; i < countof(publishedVariableArray); i++) {
addPubSubVariable(server, publishedVariableArray[i]);
addPubSubDataSetField(server, publishedVariableArray[i]);
}
addWriterGroup(server);
addDataSetWriter(server);
}
#include <open62541/client_subscriptions.h>
#include <open62541/plugin/log_stdout.h>
#include <open62541/plugin/pubsub_udp.h>
#include <open62541/server.h>
#include <open62541/server_config_default.h>
#include <open62541/types_generated.h>
#include "ua_pubsub.h"
#include <stdio.h>
#include <signal.h>
#include <stdlib.h>
UA_NodeId connectionIdentifier;
UA_NodeId readerGroupIdentifier;
UA_NodeId readerIdentifier;
UA_DataSetReaderConfig readerConfig;
static void fillTestDataSetMetaData(UA_DataSetMetaDataType *pMetaData);
/* callback to handle change notifications */
static void dataChangeNotificationCallback(UA_Server *server, UA_UInt32 monitoredItemId,
void *monitoredItemContext, const UA_NodeId *nodeId,
void *nodeContext, UA_UInt32 attributeId,
const UA_DataValue *var) {
long int micro_seconds = getMicroSeconds();
// filter out ID from Data Set
if(UA_Variant_hasScalarType(&var->value, &UA_TYPES[UA_TYPES_UINT32])) {
unsigned int coupler_id = *(UA_UInt32*) var->value.data;
// care for other coupler_id NOT ourselves
if (coupler_id!=COUPLER_ID) {
//HEART_BEAT_ID_LIST
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Got heart beat from ID = %d, timestamp=%ld", coupler_id, micro_seconds);
// convert coupler_id to str
char* coupler_id_str = convertInt2Str(coupler_id);
// convert micro seconds to str
char* micro_seconds_str = convertLongInt2Str(micro_seconds);
// Add to our local linked list
addItem(&SUBSCRIBER_DICT, coupler_id_str, micro_seconds_str);
}
}
// filter out heart_beat from Data Set
if(UA_Variant_hasScalarType(&var->value, &UA_TYPES[UA_TYPES_FLOAT])) {
float heart_beat = *(UA_Float*) var->value.data;
//UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "heart_beat = %f", heart_beat);
}
}
/* Add new connection to the server */
static UA_StatusCode addPubSubConnectionXXX(UA_Server *server, UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl) {
if((server == NULL) || (transportProfile == NULL) ||
(networkAddressUrl == NULL)) {
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_StatusCode retval = UA_STATUSCODE_GOOD;
/* Configuration creation for the connection */
UA_PubSubConnectionConfig connectionConfig;
memset (&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
connectionConfig.name = UA_STRING("UDPMC Connection 1");
connectionConfig.transportProfileUri = *transportProfile;
connectionConfig.enabled = UA_TRUE;
UA_Variant_setScalar(&connectionConfig.address, networkAddressUrl,
&UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
connectionConfig.publisherId.numeric = UA_UInt32_random ();
retval |= UA_Server_addPubSubConnection (server, &connectionConfig, &connectionIdentifier);
if (retval != UA_STATUSCODE_GOOD) {
return retval;
}
return retval;
}
/**
* **ReaderGroup**
*
* ReaderGroup is used to group a list of DataSetReaders. All ReaderGroups are
* created within a PubSubConnection and automatically deleted if the connection
* is removed. All network message related filters are only available in the DataSetReader. */
/* Add ReaderGroup to the created connection */
static UA_StatusCode addReaderGroup(UA_Server *server) {
if(server == NULL) {
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_StatusCode retval = UA_STATUSCODE_GOOD;
UA_ReaderGroupConfig readerGroupConfig;
memset (&readerGroupConfig, 0, sizeof(UA_ReaderGroupConfig));
readerGroupConfig.name = UA_STRING("ReaderGroup1");
retval |= UA_Server_addReaderGroup(server, connectionIdentifier, &readerGroupConfig,
&readerGroupIdentifier);
UA_Server_setReaderGroupOperational(server, readerGroupIdentifier);
return retval;
}
/**
* **DataSetReader**
*
* DataSetReader can receive NetworkMessages with the DataSetMessage
* of interest sent by the Publisher. DataSetReader provides
* the configuration necessary to receive and process DataSetMessages
* on the Subscriber side. DataSetReader must be linked with a
* SubscribedDataSet and be contained within a ReaderGroup. */
/* Add DataSetReader to the ReaderGroup */
static UA_StatusCode addDataSetReader(UA_Server *server) {
if(server == NULL) {
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_StatusCode retval = UA_STATUSCODE_GOOD;
memset (&readerConfig, 0, sizeof(UA_DataSetReaderConfig));
readerConfig.name = UA_STRING("DataSet Reader 1");
/* Parameters to filter which DataSetMessage has to be processed
* by the DataSetReader */
/* The following parameters are used to show that the data published by
* tutorial_pubsub_publish.c is being subscribed and is being updated in
* the information model */
UA_UInt16 publisherIdentifier = PUBLISHER_ID;
readerConfig.publisherId.type = &UA_TYPES[UA_TYPES_UINT16];
readerConfig.publisherId.data = &publisherIdentifier;
readerConfig.writerGroupId = WRITER_GROUP_ID;
readerConfig.dataSetWriterId = DATASET_WRITER_ID;
/* Setting up Meta data configuration in DataSetReader */
fillTestDataSetMetaData(&readerConfig.dataSetMetaData);
retval |= UA_Server_addDataSetReader(server, readerGroupIdentifier, &readerConfig,
&readerIdentifier);
return retval;
}
/**
* **SubscribedDataSet**
*
* Set SubscribedDataSet type to TargetVariables data type.
* Add subscribedvariables to the DataSetReader */
static UA_StatusCode addSubscribedVariables(UA_Server *server, UA_NodeId dataSetReaderId) {
if(server == NULL)
return UA_STATUSCODE_BADINTERNALERROR;
UA_StatusCode retval = UA_STATUSCODE_GOOD;
UA_NodeId folderId;
UA_String folderName = readerConfig.dataSetMetaData.name;
UA_ObjectAttributes oAttr = UA_ObjectAttributes_default;
UA_QualifiedName folderBrowseName;
if(folderName.length > 0) {
oAttr.displayName.locale = UA_STRING ("en-US");
oAttr.displayName.text = folderName;
folderBrowseName.namespaceIndex = 1;
folderBrowseName.name = folderName;
}
else {
oAttr.displayName = UA_LOCALIZEDTEXT ("en-US", "Subscribed Variables");
folderBrowseName = UA_QUALIFIEDNAME (1, "Subscribed Variables");
}
UA_Server_addObjectNode (server, UA_NODEID_NULL,
UA_NODEID_NUMERIC (0, UA_NS0ID_OBJECTSFOLDER),
UA_NODEID_NUMERIC (0, UA_NS0ID_ORGANIZES),
folderBrowseName, UA_NODEID_NUMERIC (0,
UA_NS0ID_BASEOBJECTTYPE), oAttr, NULL, &folderId);
/**
* **TargetVariables**
*
* The SubscribedDataSet option TargetVariables defines a list of Variable mappings between
* received DataSet fields and target Variables in the Subscriber AddressSpace.
* The values subscribed from the Publisher are updated in the value field of these variables */
/* Create the TargetVariables with respect to DataSetMetaData fields */
UA_FieldTargetVariable *targetVars = (UA_FieldTargetVariable *)
UA_calloc(readerConfig.dataSetMetaData.fieldsSize, sizeof(UA_FieldTargetVariable));
for(size_t i = 0; i < readerConfig.dataSetMetaData.fieldsSize; i++) {
/* Variable to subscribe data */
UA_VariableAttributes vAttr = UA_VariableAttributes_default;
UA_LocalizedText_copy(&readerConfig.dataSetMetaData.fields[i].description,
&vAttr.description);
vAttr.displayName.locale = UA_STRING("en-US");
vAttr.displayName.text = readerConfig.dataSetMetaData.fields[i].name;
vAttr.dataType = readerConfig.dataSetMetaData.fields[i].dataType;
UA_NodeId newNode;
retval |= UA_Server_addVariableNode(server, UA_NODEID_NUMERIC(1, (UA_UInt32)i + 50000),
folderId,
UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT),
UA_QUALIFIEDNAME(1, (char *)readerConfig.dataSetMetaData.fields[i].name.data),
UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE),
vAttr, NULL, &newNode);
/*monitor variable*/
UA_MonitoredItemCreateRequest monRequest = UA_MonitoredItemCreateRequest_default(newNode);
UA_Server_createDataChangeMonitoredItem(server, UA_TIMESTAMPSTORETURN_SOURCE,
monRequest, NULL, dataChangeNotificationCallback);
/* For creating Targetvariables */
UA_FieldTargetDataType_init(&targetVars[i].targetVariable);
targetVars[i].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE;
targetVars[i].targetVariable.targetNodeId = newNode;
}
retval = UA_Server_DataSetReader_createTargetVariables(server, dataSetReaderId,
readerConfig.dataSetMetaData.fieldsSize, targetVars);
for(size_t i = 0; i < readerConfig.dataSetMetaData.fieldsSize; i++)
UA_FieldTargetDataType_clear(&targetVars[i].targetVariable);
UA_free(targetVars);
UA_free(readerConfig.dataSetMetaData.fields);
return retval;
}
/**
* **DataSetMetaData**
*
* The DataSetMetaData describes the content of a DataSet. It provides the information necessary to decode
* DataSetMessages on the Subscriber side. DataSetMessages received from the Publisher are decoded into
* DataSet and each field is updated in the Subscriber based on datatype match of TargetVariable fields of Subscriber
* and PublishedDataSetFields of Publisher */
/* Define MetaData for TargetVariables */
static void fillTestDataSetMetaData(UA_DataSetMetaDataType *pMetaData) {
if(pMetaData == NULL) {
return;
}
UA_DataSetMetaDataType_init (pMetaData);
pMetaData->name = UA_STRING ("DataSet 1 (subscribed)");
/* Static definition of number of fields size to 2 to create four different
* targetVariables of distinct datatype
*/
pMetaData->fieldsSize = 2;
pMetaData->fields = (UA_FieldMetaData*)UA_Array_new (pMetaData->fieldsSize,
&UA_TYPES[UA_TYPES_FIELDMETADATA]);
/* heartbeat */
UA_FieldMetaData_init (&pMetaData->fields[0]);
UA_NodeId_copy (&UA_TYPES[UA_TYPES_FLOAT].typeId,
&pMetaData->fields[0].dataType);
pMetaData->fields[0].builtInType = UA_NS0ID_FLOAT;
pMetaData->fields[0].name = UA_STRING ("Heartbeat (subscribed)");
pMetaData->fields[0].valueRank = -1; /* scalar */
// ID
UA_FieldMetaData_init (&pMetaData->fields[1]);
UA_NodeId_copy (&UA_TYPES[UA_TYPES_UINT32].typeId,
&pMetaData->fields[1].dataType);
pMetaData->fields[1].builtInType = UA_NS0ID_UINT32;
pMetaData->fields[1].name = UA_STRING ("ID (subscribed)");
pMetaData->fields[1].valueRank = -1; /* scalar */
}
void callbackCheckHeartBeat() {
int i, coupler_id;
long int micro_seconds = getMicroSeconds();
size_t n = sizeof(HEART_BEAT_ID_LIST)/sizeof(HEART_BEAT_ID_LIST[0]);
for (int i = 0; i < n; i++) {
coupler_id = HEART_BEAT_ID_LIST[i];
if (coupler_id > 0) {
// convert to str as this is the hash key
char* coupler_id_str = convertInt2Str(coupler_id);
char *last_seen_timestamp = getItem(SUBSCRIBER_DICT, coupler_id_str);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Check ID=%s, last_seen=%s", coupler_id_str, last_seen_timestamp);
if (last_seen_timestamp != NULL){
// we do have timestamp for this coupler ID
int last_seen_timestamp_int = atoi(last_seen_timestamp);
int timestamp_delta = micro_seconds - last_seen_timestamp_int;
bool is_down = (timestamp_delta > 1000);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "\tdelta=%d, is_down=%d", timestamp_delta, is_down);
}
}
}
}
static int enableSubscribeToHeartBeat(UA_Server *server, UA_ServerConfig *config){
// enable subscribe to keep-alive messages
UA_String transportProfile = UA_STRING(DEFAULT_TRANSPORT_PROFILE);
UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING(DEFAULT_NETWORK_ADDRESS_URL)};
addPubSubConnectionXXX(server, &transportProfile, &networkAddressUrl);
/* Add ReaderGroup to the created PubSubConnection */
addReaderGroup(server);
/* Add DataSetReader to the created ReaderGroup */
addDataSetReader(server);
/* Add SubscribedVariables to the created DataSetReader */
addSubscribedVariables(server, readerIdentifier);
// add a callback which will check related coupler's heart beats
UA_UInt64 callbackId = 2;
UA_Server_addRepeatedCallback(server, callbackCheckHeartBeat, NULL, HEART_BEAT_INTERVAL, &callbackId);
}
...@@ -33,9 +33,26 @@ ...@@ -33,9 +33,26 @@
#include <open62541/plugin/pubsub_ethernet.h> #include <open62541/plugin/pubsub_ethernet.h>
#include <open62541/plugin/pubsub_udp.h> #include <open62541/plugin/pubsub_udp.h>
// global Id of coupler
static int COUPLER_ID = 0;
// global server
UA_Server *server;
// global dictionary of subscribers
dict_t *SUBSCRIBER_DICT;
// The default port of OPC-UA server // The default port of OPC-UA server
const int DEFAULT_OPC_UA_PORT = 4840; const int DEFAULT_OPC_UA_PORT = 4840;
const int DEFAULT_MODE = 0; const int DEFAULT_MODE = 0;
const int DEFAULT_ID = 0;
// OPC UA's Pub/Sub profile
char *DEFAULT_TRANSPORT_PROFILE = "http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp";
char *DEFAULT_NETWORK_ADDRESS_URL = "opc.udp://224.0.0.22:4840/";
#include "keep_alive_publisher.h"
#include "keep_alive_subscriber.h"
// CLI arguments handling // CLI arguments handling
const char *argp_program_version = "OSIE OPC-UA coupler 0.0.1"; const char *argp_program_version = "OSIE OPC-UA coupler 0.0.1";
...@@ -47,12 +64,16 @@ static struct argp_option options[] = { ...@@ -47,12 +64,16 @@ static struct argp_option options[] = {
{"device", 'd', "/dev/i2c-1", 0, "Linux block device path."}, {"device", 'd', "/dev/i2c-1", 0, "Linux block device path."},
{"slave-address-list", 's', "0x58", 0, "Comma separated list of slave I2C addresses."}, {"slave-address-list", 's', "0x58", 0, "Comma separated list of slave I2C addresses."},
{"mode", 'm', "0", 0, "Set different modes of operation of coupler. Default (0) is set attached \ {"mode", 'm', "0", 0, "Set different modes of operation of coupler. Default (0) is set attached \
I2C's state state. Virtual (1) which does NOT set any I2C slaves' state."}, I2C's state state. Virtual (1) which does NOT set any I2C slaves' state."},
{"username", 'u', "", 0, "Username."}, {"username", 'u', "", 0, "Username."},
{"password", 'w', "", 0, "Password."}, {"password", 'w', "", 0, "Password."},
{"key", 'k', "", 0, "x509 key."}, {"key", 'k', "", 0, "x509 key."},
{"certificate", 'c', "", 0, "X509 certificate."}, {"certificate", 'c', "", 0, "X509 certificate."},
{"uuid", 'i', "", 0, "UUID of coupler"}, {"id", 'i', "0", 0, "ID of coupler."},
{"heart-beat", 'b', "0", 0, "Publish heart beat to other couplers."},
{"heart-beat-interval", 't', "500", 0, "Heart beat interval in ms."},
{"heart-beat-id-list", 'l', "", 0, "Comma separated list of IDs of couplers to watch for heart beats. \
If heart beat is missing coupler goes to safe mode."},
{0} {0}
}; };
...@@ -66,7 +87,10 @@ struct arguments ...@@ -66,7 +87,10 @@ struct arguments
char *password; char *password;
char *key; char *key;
char *certificate; char *certificate;
char *uuid; int id;
bool heart_beat;
int heart_beat_interval;
char *heart_beat_id_list;
}; };
static error_t parse_opt(int key, char *arg, struct argp_state *state) static error_t parse_opt(int key, char *arg, struct argp_state *state)
...@@ -74,36 +98,45 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) ...@@ -74,36 +98,45 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state)
struct arguments *arguments = state->input; struct arguments *arguments = state->input;
switch (key) { switch (key) {
case 'p': case 'p':
arguments->port = arg ? atoi (arg) : DEFAULT_OPC_UA_PORT; arguments->port = arg ? atoi (arg) : DEFAULT_OPC_UA_PORT;
break; break;
case 'd': case 'd':
arguments->device = arg; arguments->device = arg;
break; break;
case 's': case 's':
arguments->slave_address_list = arg; arguments->slave_address_list = arg;
break; break;
case 'm': case 'm':
arguments->mode = arg ? atoi (arg) : DEFAULT_MODE; arguments->mode = arg ? atoi (arg) : DEFAULT_MODE;
break; break;
case 'u': case 'u':
arguments->username = arg; arguments->username = arg;
break; break;
case 'w': case 'w':
arguments->password = arg; arguments->password = arg;
break; break;
case 'c': case 'c':
arguments->certificate = arg; arguments->certificate = arg;
break; break;
case 'k': case 'k':
arguments->key = arg; arguments->key = arg;
break; break;
case 'i': case 'i':
arguments->uuid = arg; arguments->id = arg ? atoi (arg) : DEFAULT_ID;
break; break;
case 'b':
arguments->heart_beat = atoi (arg);
break;
case 't':
arguments->heart_beat_interval = arg ? atoi (arg) : DEFAULT_HEART_BEAT_INTERVAL;
break;
case 'l':
arguments->heart_beat_id_list = arg;
break;
case ARGP_KEY_ARG: case ARGP_KEY_ARG:
return 0; return 0;
default: default:
return ARGP_ERR_UNKNOWN; return ARGP_ERR_UNKNOWN;
} }
return 0; return 0;
} }
...@@ -118,6 +151,7 @@ static void stopHandler(int sign) ...@@ -118,6 +151,7 @@ static void stopHandler(int sign)
running = false; running = false;
} }
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
int i; int i;
...@@ -125,6 +159,11 @@ int main(int argc, char **argv) ...@@ -125,6 +159,11 @@ int main(int argc, char **argv)
long result; long result;
char *eptr; char *eptr;
// init dictionary only once$
if (SUBSCRIBER_DICT==NULL){
SUBSCRIBER_DICT = *dictAlloc();
}
// handle command line arguments // handle command line arguments
struct arguments arguments; struct arguments arguments;
arguments.port = DEFAULT_OPC_UA_PORT; arguments.port = DEFAULT_OPC_UA_PORT;
...@@ -135,7 +174,8 @@ int main(int argc, char **argv) ...@@ -135,7 +174,8 @@ int main(int argc, char **argv)
arguments.password = ""; arguments.password = "";
arguments.key = ""; arguments.key = "";
arguments.certificate = ""; arguments.certificate = "";
arguments.uuid = ""; arguments.id = DEFAULT_ID;
arguments.heart_beat_interval = DEFAULT_HEART_BEAT_INTERVAL;
argp_parse(&argp, argc, argv, 0, 0, &arguments); argp_parse(&argp, argc, argv, 0, 0, &arguments);
printf("Mode=%d\n", arguments.mode); printf("Mode=%d\n", arguments.mode);
...@@ -144,12 +184,16 @@ int main(int argc, char **argv) ...@@ -144,12 +184,16 @@ int main(int argc, char **argv)
printf("Slave address list=%s\n", arguments.slave_address_list); printf("Slave address list=%s\n", arguments.slave_address_list);
printf("Key=%s\n", arguments.key); printf("Key=%s\n", arguments.key);
printf("Certificate=%s\n", arguments.certificate); printf("Certificate=%s\n", arguments.certificate);
printf("UUID=%s\n", arguments.uuid); printf("ID=%d\n", arguments.id);
printf("Heart beat=%d\n", arguments.heart_beat);
printf("Heart beat interval=%d ms\n", arguments.heart_beat_interval);
printf("Heart beat ID list=%s\n", arguments.heart_beat_id_list);
// transfer to global variables (CLI input) // transfer to global variables (CLI input)
COUPLER_ID = arguments.id;
I2C_VIRTUAL_MODE = arguments.mode; I2C_VIRTUAL_MODE = arguments.mode;
I2C_BLOCK_DEVICE_NAME = arguments.device; I2C_BLOCK_DEVICE_NAME = arguments.device;
HEART_BEAT_INTERVAL = arguments.heart_beat_interval;
// convert arguments.slave_address_list -> I2C_SLAVE_ADDR_LIST // convert arguments.slave_address_list -> I2C_SLAVE_ADDR_LIST
i = 0; i = 0;
...@@ -162,6 +206,17 @@ int main(int argc, char **argv) ...@@ -162,6 +206,17 @@ int main(int argc, char **argv)
token = strtok(NULL, ","); token = strtok(NULL, ",");
} }
// convert arguments.heart_beat_id_list -> HEART_BEAT_ID_LIST
i = 0;
char *tk= strtok(arguments.heart_beat_id_list, ",");
while (tk != NULL)
{
// from CLI we get a comma separated list on INTs representing coupler' ID
result = strtol(tk, &eptr, 16);
HEART_BEAT_ID_LIST[i++] = result;
tk = strtok(NULL, ",");
}
// always start attached slaves from a know safe shutdown state // always start attached slaves from a know safe shutdown state
safeShutdownI2CSlaveList(); safeShutdownI2CSlaveList();
...@@ -171,12 +226,13 @@ int main(int argc, char **argv) ...@@ -171,12 +226,13 @@ int main(int argc, char **argv)
bool addx509 = strlen(arguments.key) > 0 && strlen(arguments.certificate); bool addx509 = strlen(arguments.key) > 0 && strlen(arguments.certificate);
bool addUserNamePasswordAuthentication = strlen(arguments.username) > 0 && strlen(arguments.password) > 0; bool addUserNamePasswordAuthentication = strlen(arguments.username) > 0 && strlen(arguments.password) > 0;
UA_Server *server = UA_Server_new(); //UA_Server *server = UA_Server_new();
server = UA_Server_new();
UA_ServerConfig_setMinimal(UA_Server_getConfig(server), arguments.port, NULL); UA_ServerConfig_setMinimal(UA_Server_getConfig(server), arguments.port, NULL);
UA_ServerConfig *config = UA_Server_getConfig(server); UA_ServerConfig *config = UA_Server_getConfig(server);
config->verifyRequestTimestamp = UA_RULEHANDLING_ACCEPT; config->verifyRequestTimestamp = UA_RULEHANDLING_ACCEPT;
// add variables representing physical relarys / inputs, etc // add variables representing physical relaray / inputs, etc
addVariable(server); addVariable(server);
addValueCallbackToCurrentTimeVariable(server); addValueCallbackToCurrentTimeVariable(server);
...@@ -224,6 +280,17 @@ int main(int argc, char **argv) ...@@ -224,6 +280,17 @@ int main(int argc, char **argv)
} }
#endif #endif
// enable protocol for Pub/Sub
UA_ServerConfig_addPubSubTransportLayer(config, UA_PubSubTransportLayerUDPMP());
// enable publish keep-alive messages
if (arguments.heart_beat) {
enablePublishHeartBeat(server, config);
}
// enable subscribe to keep-alive messages
enableSubscribeToHeartBeat(server, config);
// run server // run server
UA_StatusCode retval = UA_Server_run(server, &running); UA_StatusCode retval = UA_Server_run(server, &running);
UA_Server_delete(server); UA_Server_delete(server);
......
...@@ -42,6 +42,7 @@ configure-options = ...@@ -42,6 +42,7 @@ configure-options =
-DUA_NAMESPACE_ZERO=REDUCED -DUA_NAMESPACE_ZERO=REDUCED
-DUA_ENABLE_ENCRYPTION=MBEDTLS -DUA_ENABLE_ENCRYPTION=MBEDTLS
-DUA_ENABLE_ENCRYPTION_MBEDTLS=ON -DUA_ENABLE_ENCRYPTION_MBEDTLS=ON
-DUA_ENABLE_PUBSUB_INFORMATIONMODEL=ON
[osie-repository] [osie-repository]
recipe = slapos.recipe.build:gitclone recipe = slapos.recipe.build:gitclone
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment