Commit 0cfca900 authored by Claes Sjofors's avatar Claes Sjofors

Remote, remnode and remote support for RabbitMQ added

parent e30bad15
include $(pwre_dir_symbols)
-include $(pwre_kroot)/tools/bld/src/$(os_name)/$(hw_name)/$(type_name)_generic.mk
ifeq ($($(type_name)_generic_mk),)
-include $(pwre_kroot)/tools/bld/src/$(os_name)/$(type_name)_generic.mk
endif
ifeq ($($(type_name)_generic_mk),)
include $(pwre_kroot)/tools/bld/src/$(type_name)_generic.mk
endif
ifndef link_rule_mk
link_rule_mk := 1
link = $(ldxx) $(elinkflags) $(domap) -o $(export_exe) \
$(export_obj) $(objects) $(rt_msg_eobjs) \
$(pwre_conf_libdir) $(pwre_conf_libpwrremote) $(pwre_conf_libpwrrt) \
-lrabbitmq $(pwre_conf_lib)
endif
/*
* Proview Open Source Process Control.
* Copyright (C) 2005-2017 SSAB EMEA AB.
*
* This file is part of Proview.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 2 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Proview. If not, see <http://www.gnu.org/licenses/>
*
* Linking Proview statically or dynamically with other modules is
* making a combined work based on Proview. Thus, the terms and
* conditions of the GNU General Public License cover the whole
* combination.
*
* In addition, as a special exception, the copyright holders of
* Proview give you permission to, from the build function in the
* Proview Configurator, combine Proview with modules generated by the
* Proview PLC Editor to a PLC program, regardless of the license
* terms of these modules. You may copy and distribute the resulting
* combined work under the terms of your choice, provided that every
* copy of the combined work is accompanied by a complete copy of
* the source code of Proview (the version used to produce the
* combined work), being distributed under the terms of the GNU
* General Public License plus this exception.
*/
/* rs_remote_rabbitmq.c Remote transport process with rabbitmq */
/*_Include files_________________________________________________________*/
#include <amqp.h>
#include <amqp_framing.h>
#include <amqp_tcp_socket.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "pwr_class.h"
#include "pwr_systemclasses.h"
#include "rt_gdh.h"
#include "co_time.h"
#include "co_cdh.h"
#include "rt_errh.h"
#include "pwr_baseclasses.h"
#include "pwr_remoteclasses.h"
#include "rt_pwr_msg.h"
#include "rs_remote_msg.h"
#include "rt_aproc.h"
#include "remote.h"
#include "remote_remtrans_utils.h"
#define TIME_INCR 0.02
#define debug 0
#define remote_cMsgClass 204
typedef struct {
amqp_connection_state_t conn;
amqp_socket_t *socket;
amqp_channel_t channel;
pwr_sClass_RemnodeRabbitMQ *op;
int is_producer;
int is_consumer;
} rabbit_sCtx, *rabbit_tCtx;
typedef struct {
unsigned int msg_size;
unsigned short int msg_id[2];
} rabbit_header;
static rabbit_tCtx ctx = 0;
static remnode_item rn;
static pwr_sClass_RemnodeRabbitMQ *rn_rmq;
/*************************************************************************
**************************************************************************
*
* RemoteSleep
*
**************************************************************************
**************************************************************************/
void RemoteSleep(float time)
{
struct timespec rqtp, rmtp;
rqtp.tv_sec = 0;
rqtp.tv_nsec = (long int) (time * 1000000000);
nanosleep(&rqtp, &rmtp);
return;
}
void rmq_close( int destroy)
{
if ( ctx->channel) {
amqp_channel_close( ctx->conn, ctx->channel, AMQP_REPLY_SUCCESS);
ctx->channel = 0;
}
if ( ctx->socket) {
amqp_connection_close( ctx->conn, AMQP_REPLY_SUCCESS);
ctx->socket = 0;
}
if ( destroy) {
amqp_destroy_connection( ctx->conn);
ctx->conn = 0;
}
}
/*************************************************************************
**************************************************************************
*
* Namn : rmq_connect
*
* Typ : int
*
* Typ Parameter IOGF Beskrivning
*
* Beskrivning : Connect to RabbitMQ broker.
*
**************************************************************************
**************************************************************************/
int rmq_connect()
{
int sts;
amqp_rpc_reply_t rep;
amqp_channel_open_ok_t *co;
if ( !ctx->conn) {
ctx->conn = amqp_new_connection();
printf( "Connection : %u\n", (unsigned int)ctx->conn);
}
if ( !ctx->socket) {
ctx->socket = (amqp_socket_t *)amqp_tcp_socket_new( ctx->conn);
if ( !ctx->socket) {
printf( "Socket error\n");
return 0;
}
sts = amqp_socket_open( ctx->socket, ctx->op->Server, ctx->op->Port);
if ( sts) {
printf( "Socket open error %d\n", sts);
ctx->socket = 0;
return 0;
}
}
rep = amqp_login(ctx->conn, "/", 0, 131072, 0,
AMQP_SASL_METHOD_PLAIN, ctx->op->User, ctx->op->Password);
if ( rep.reply_type != AMQP_RESPONSE_NORMAL) {
if ( rep.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION)
printf( "Login failure, not authorized? %d\n", rep.reply_type);
else
printf( "Login failure: %d\n", rep.reply_type);
return 0;
}
if ( !ctx->channel) {
if ( ctx->op->Channel == 0)
ctx->channel = 1;
else
ctx->channel = ctx->op->Channel;
co = amqp_channel_open( ctx->conn, ctx->channel);
if ( !co) {
printf( "Channel not open\n");
ctx->channel = 0;
}
else {
printf( "Channel open %s\n", (char *)co->channel_id.bytes);
}
}
/* Declare send queue */
if ( ctx->is_producer) {
// 0 passive 0 durable 0 exclusive 0 auto-delete
amqp_queue_declare_ok_t *qd = amqp_queue_declare( ctx->conn, ctx->channel, amqp_cstring_bytes(ctx->op->SendQueue),
0, ctx->op->Durable, 0, 0, amqp_empty_table);
if ( !qd) {
printf( "SendQueue not declared\n");
}
else {
printf( "SendQueue %s message cnt %d, consumer cnt %d\n", (char *)qd->queue.bytes,
qd->message_count, qd->consumer_count);
}
}
/* Declare receive queue */
if ( ctx->is_consumer) {
// 0 passive 0 durable 0 exclusive 0 auto-delete
amqp_queue_declare_ok_t *qd = amqp_queue_declare( ctx->conn, ctx->channel, amqp_cstring_bytes(ctx->op->ReceiveQueue),
0, ctx->op->Durable, 0, 0, amqp_empty_table);
if ( !qd) {
printf( "ReceiveQueue not declared\n");
}
else {
printf( "ReceiveQueue %s message cnt %d, consumer cnt %d\n", (char *)qd->queue.bytes,
qd->message_count, qd->consumer_count);
}
}
if ( ctx->is_producer && strcmp( ctx->op->Exchange, "") != 0)
amqp_exchange_declare( ctx->conn, ctx->channel, amqp_cstring_bytes(ctx->op->Exchange),
amqp_cstring_bytes("fanout"), 0, ctx->op->Durable, amqp_empty_table);
if ( ctx->is_producer && strcmp( ctx->op->Exchange, "") != 0)
amqp_queue_bind(ctx->conn, ctx->channel, amqp_cstring_bytes(ctx->op->SendQueue),
amqp_cstring_bytes(ctx->op->Exchange), amqp_cstring_bytes("exchange-key"), amqp_empty_table);
amqp_basic_consume_ok_t *bc;
// 0 no-local 1 no-ack 0 exclusive
if ( ctx->is_consumer) {
bc = amqp_basic_consume( ctx->conn, ctx->channel, amqp_cstring_bytes(ctx->op->ReceiveQueue), amqp_empty_bytes,
0, !ctx->op->Acknowledge, 0, amqp_empty_table);
if ( !bc)
printf( "Consumer error\n");
else
printf( "Consumer tag: %s\n", (char *)bc->consumer_tag.bytes);
}
return 1;
}
/*************************************************************************
**************************************************************************
*
* Namn : rmq_receive
*
* Typ : unsigned int
*
* Typ Parameter IOGF Beskrivning
*
* Beskrivning : Invoked when a RabbitMQ message is received.
*
**************************************************************************
**************************************************************************/
unsigned int rmq_receive()
{
pwr_tStatus sts;
int search_remtrans = 0;
remtrans_item *remtrans;
amqp_rpc_reply_t ret;
amqp_envelope_t envelope;
struct timeval t = {2,0};
rabbit_header header;
int msg_received = 0;
amqp_maybe_release_buffers(ctx->conn);
ret = amqp_consume_message(ctx->conn, &envelope, &t, 0);
switch ( ret.reply_type) {
case AMQP_RESPONSE_NORMAL: {
break;
}
case AMQP_RESPONSE_NONE:
return REM__SUCCESS;
case AMQP_RESPONSE_SERVER_EXCEPTION:
return REM__EXCEPTION;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
switch( ret.library_error) {
case AMQP_STATUS_TIMEOUT: {
amqp_destroy_envelope( &envelope);
return REM__TIMEOUT;
}
case AMQP_STATUS_UNEXPECTED_STATE: {
amqp_frame_t frame;
sts = amqp_simple_wait_frame_noblock( ctx->conn, &frame, &t);
if ( sts == AMQP_STATUS_TIMEOUT) {
printf( "Wait frame timeout\n");
return REM__EXCEPTION;
}
else if ( sts == AMQP_STATUS_OK) {
if ( frame.frame_type == AMQP_FRAME_METHOD) {
switch ( frame.payload.method.id) {
case AMQP_BASIC_ACK_METHOD:
printf( "Basic ack method called\n");
break;
case AMQP_BASIC_RETURN_METHOD:
printf( "Basic return method called\n");
break;
case AMQP_CHANNEL_CLOSE_METHOD:
printf( "Channel close method called\n");
break;
case AMQP_CONNECTION_CLOSE_METHOD:
printf( "Connection close method called\n");
break;
default: ;
}
}
return REM__EXCEPTION;
}
else
return REM__EXCEPTION;
}
}
// Reconnect...
rmq_close( 1);
return REM__EXCEPTION;
default:
printf( "Unknown Reply type: %d\n", ret.reply_type);
}
if (debug) printf("Received message %d\n", envelope.message.body.len);
if ( envelope.message.body.len > 0 && rn_rmq->DisableHeader) {
/* Header disabled, take the first receive remtrans object */
remtrans = rn.remtrans;
search_remtrans = 1;
while(remtrans && search_remtrans) {
/* Match? */
if (remtrans->objp->Direction == REMTRANS_IN) {
search_remtrans = false;
sts = RemTrans_Receive(remtrans, (char *)envelope.message.body.bytes, envelope.message.body.len);
msg_received = 1;
}
remtrans = (remtrans_item *) remtrans->next;
}
if (search_remtrans) {
rn_rmq->ErrCount++;
errh_Info("RabbitMQ Receive no remtrans %s", rn_rmq->ReceiveQueue);
}
}
else if ( envelope.message.body.len >= sizeof(rabbit_header)) {
memcpy(&header, envelope.message.body.bytes, sizeof(rabbit_header));
/* Convert the header to host byte order */
header.msg_size = ntohs(header.msg_size);
header.msg_id[0] = ntohs(header.msg_id[0]);
header.msg_id[1] = ntohs(header.msg_id[1]);
search_remtrans = 1;
remtrans = rn.remtrans;
while(remtrans && search_remtrans) {
if (remtrans->objp->Address[0] == header.msg_id[0] &&
remtrans->objp->Address[1] == header.msg_id[1] &&
remtrans->objp->Direction == REMTRANS_IN &&
remtrans->objp->DataValid == 0) {
search_remtrans = false;
sts = RemTrans_Receive(remtrans, (char *)envelope.message.body.bytes + sizeof(rabbit_header), envelope.message.body.len);
if (sts != STATUS_OK && sts != STATUS_BUFF)
errh_Error("Error from RemTrans_Receive, queue %s, status %d", rn_rmq->ReceiveQueue, sts, 0);
msg_received = 1;
break;
}
remtrans = (remtrans_item *) remtrans->next;
}
if (search_remtrans) {
rn_rmq->ErrCount++;
errh_Info("No remtrans for received message, queue %s, class %d, type %d", rn_rmq->ReceiveQueue, header.msg_id[0], header.msg_id[1]);
}
}
if ( ctx->op->Acknowledge) {
if ( msg_received)
amqp_basic_ack( ctx->conn, ctx->channel, envelope.delivery_tag, 0);
else
/* Requeue the message */
amqp_basic_nack( ctx->conn, ctx->channel, envelope.delivery_tag, 0, 1);
}
amqp_destroy_envelope( &envelope);
return sts;
}
/*************************************************************************
**************************************************************************
*
* Namn : rmq_send
*
* Typ : unsigned int
*
* Typ Parameter IOGF Beskrivning
*
* Beskrivning : Sends a RabbitMQ message to remote node
*
**************************************************************************
**************************************************************************/
unsigned int rmq_send( remnode_item *remnode,
pwr_sClass_RemTrans *remtrans,
char *buf,
int buf_size)
{
int sts;
amqp_basic_properties_t prop;
amqp_bytes_t msg;
char *tmpbuf;
unsigned int tmpbuf_size;
if (rn_rmq->DisableHeader) {
msg.bytes = buf;
msg.len = buf_size;
}
else {
tmpbuf_size = sizeof(rabbit_header) + buf_size;
tmpbuf = malloc( tmpbuf_size);
memcpy( tmpbuf + sizeof(rabbit_header), buf, buf_size);
((rabbit_header *)tmpbuf)->msg_size = htons(tmpbuf_size);
((rabbit_header *)tmpbuf)->msg_id[0] = htons(remtrans->Address[0]);
((rabbit_header *)tmpbuf)->msg_id[1] = htons(remtrans->Address[1]);
msg.bytes = tmpbuf;
msg.len = tmpbuf_size;
}
prop.delivery_mode = 2;
prop._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
// 0 mandatory 0 immediate
if ( strcmp( ctx->op->Exchange, "") != 0)
sts = amqp_basic_publish( ctx->conn, ctx->channel, amqp_cstring_bytes(ctx->op->Exchange),
amqp_cstring_bytes(""), 0, 0, &prop, msg);
else
sts = amqp_basic_publish( ctx->conn, ctx->channel, amqp_cstring_bytes(ctx->op->Exchange),
amqp_cstring_bytes(ctx->op->SendQueue), 0, 0, &prop, msg);
if ( sts) {
remtrans->ErrCount++;
errh_Error("Send failed, queue %s, RabbitMQ status %d", rn_rmq->ReceiveQueue, sts, 0);
if (debug) printf("Send failed sts:%d\n", (int) sts);
}
if ( !rn_rmq->DisableHeader)
free( tmpbuf);
return STATUS_OK;
}
/*************************************************************************
**************************************************************************
*
* Main
*
**************************************************************************
**************************************************************************/
int main(int argc, char *argv[])
{
remtrans_item *remtrans;
unsigned char id[32];
unsigned char pname[32];
pwr_tStatus sts;
int i;
float time_since_scan = 0.0;
/* Read arg number 2, should be id for this instance and id is our queue number */
if (argc >= 2)
strcpy((char *)id, argv[1]);
else
strcpy((char *)id, "0");
/* Build process name with id */
sprintf((char *) pname, "rs_remrabbitmq_%s", id);
/* Init of errh */
errh_Init((char *) pname, errh_eAnix_remote);
errh_SetStatus(PWR__SRVSTARTUP);
/* Init of gdh */
if (debug) printf("Before gdh_init\n");
sts = gdh_Init((char *) pname);
if ( EVEN(sts)) {
errh_Fatal("gdh_Init, %m", sts);
errh_SetStatus(PWR__SRVTERM);
exit(sts);
}
/* Arg number 3 should be my remnodes objid in string representation,
read it, convert to real objid and store in remnode_item */
sts = 0;
if (argc >= 3) sts = cdh_StringToObjid(argv[2], &rn.objid);
if ( EVEN(sts)) {
errh_Fatal("cdh_StringToObjid, %m", sts);
errh_SetStatus(PWR__SRVTERM);
exit(sts);
}
/* Get pointer to RemnodeRabbitMQ object and store locally */
sts = gdh_ObjidToPointer(rn.objid, (pwr_tAddress *) &rn_rmq);
if ( EVEN(sts)) {
errh_Fatal("cdh_ObjidToPointer, %m", sts);
errh_SetStatus(PWR__SRVTERM);
exit(sts);
}
if ( strcmp( rn_rmq->ReceiveQueue, "") == 0 && strcmp( rn_rmq->SendQueue, "") == 0) {
errh_Fatal("Process terminated, neither send or receive queue configured, %s", id);
errh_SetStatus(PWR__SRVTERM);
exit(sts);
}
/* Create context */
ctx = calloc( 1, sizeof(*ctx));
ctx->op = rn_rmq;
if ( strcmp(rn_rmq->ReceiveQueue, "") != 0)
ctx->is_consumer = 1;
if ( strcmp(rn_rmq->SendQueue, "") != 0)
ctx->is_producer = 1;
/* Initialize some internal data and make standard remtrans init */
rn.next = NULL;
rn.local = NULL; // We dont use local structure since we only have one remnode
rn_rmq->ErrCount = 0;
if (debug) printf("Before remtrans_init\n");
sts = RemTrans_Init(&rn);
if ( EVEN(sts)) {
errh_Fatal("RemTrans_Init, %m", sts);
errh_SetStatus(PWR__SRVTERM);
exit(sts);
}
/* Store remtrans objects objid in remnode_qcom object */
remtrans = rn.remtrans;
i = 0;
while(remtrans) {
rn_rmq->RemTransObjects[i++] = remtrans->objid;
if ( i >= (int)(sizeof(rn_rmq->RemTransObjects)/sizeof(rn_rmq->RemTransObjects[0])))
break;
remtrans = (remtrans_item *) remtrans->next;
}
/* Connect to rabbitmq broker */
sts = rmq_connect();
if ( EVEN(sts)) {
rmq_close(1);
errh_Fatal("Process terminated, unable to connect to RabbitMQ, %s", id);
errh_SetStatus(PWR__SRVTERM);
exit(sts);
}
/* Set running status */
errh_SetStatus(PWR__SRUN);
/* Set (re)start time in remnode object */
time_GetTime(&rn_rmq->RestartTime);
/* Loop forever */
while (!doomsday) {
if ( rn_rmq->Disable == 1) {
errh_Fatal("Disabled, exiting");
errh_SetStatus(PWR__SRVTERM);
exit(0);
}
aproc_TimeStamp(TIME_INCR, 5);
if ( ctx->is_consumer)
sts = rmq_receive();
else
RemoteSleep(TIME_INCR);
time_since_scan += TIME_INCR;
if ( time_since_scan >= rn_rmq->ScanTime) {
if ( ctx->is_producer)
sts = RemTrans_Cyclic(&rn, &rmq_send);
time_since_scan = 0.0;
}
}
}
...@@ -187,6 +187,30 @@ static void AddTransports() ...@@ -187,6 +187,30 @@ static void AddTransports()
sts = gdh_GetNextObject (objid, &objid); sts = gdh_GetNextObject (objid, &objid);
} }
/* Get and configure all RabbitMQ remnodes, one process for each remnode */
sts = gdh_GetClassList (pwr_cClass_RemnodeRabbitMQ, &objid);
while ( ODD(sts))
{
sts = gdh_ObjidToPointer(objid, &objref);
sprintf(tp[tpcount].path, "rs_remote_rabbitmq");
tp[tpcount].id = id++;
tp[tpcount].disable = &((pwr_sClass_RemnodeRabbitMQ *) objref)->Disable;
tp[tpcount].restart_limit = &((pwr_sClass_RemnodeRabbitMQ *) objref)->RestartLimit;
tp[tpcount].restarts = &((pwr_sClass_RemnodeRabbitMQ *) objref)->RestartCount;
((pwr_sClass_RemnodeMQ *) objref)->RestartCount = 0;
tp[tpcount].objid = objid;
tp[tpcount].objref = objref;
tp[tpcount].classid = pwr_cClass_RemnodeRabbitMQ;
tp[tpcount].cpid = -1;
tp[tpcount].first = true;
remcfgp->RemNodeObjects[tpcount] = objid;
tpcount++;
sts = gdh_GetNextObject (objid, &objid);
}
/* Get and configure all QCom remnodes, one process for each remnode */ /* Get and configure all QCom remnodes, one process for each remnode */
sts = gdh_GetClassList (pwr_cClass_RemnodeQCom, &objid); sts = gdh_GetClassList (pwr_cClass_RemnodeQCom, &objid);
......
!
! Proview Open Source Process Control.
! Copyright (C) 2005-2017 SSAB EMEA AB.
!
! This file is part of Proview.
!
! This program is free software; you can redistribute it and/or
! modify it under the terms of the GNU General Public License as
! published by the Free Software Foundation, either version 2 of
! the License, or (at your option) any later version.
!
! This program is distributed in the hope that it will be useful
! but WITHOUT ANY WARRANTY; without even the implied warranty of
! MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
! GNU General Public License for more details.
!
! You should have received a copy of the GNU General Public License
! along with Proview. If not, see <http://www.gnu.org/licenses/>
!
! Linking Proview statically or dynamically with other modules is
! making a combined work based on Proview. Thus, the terms and
! conditions of the GNU General Public License cover the whole
! combination.
!
! In addition, as a special exception, the copyright holders of
! Proview give you permission to, from the build function in the
! Proview Configurator, combine Proview with modules generated by the
! Proview PLC Editor to a PLC program, regardless of the license
! terms of these modules. You may copy and distribute the resulting
! combined work under the terms of your choice, provided that every
! copy of the combined work is accompanied by a complete copy of
! the source code of Proview (the version used to produce the
! combined work), being distributed under the terms of the GNU
! General Public License plus this exception.
!
! remote_c_remnoderabbitmq.wb_load -- Defines the class RemnodeRabbitMQ.
!
SObject Remote:Class
!/**
! @Version 1.0
! @Group Servers,NodeConfiguration
! @Summary Configures communication through a message queue using RabbitMQ.
! Configures communication through a message queue using RabbitMQ.
!
! To direct a message from a sending RemTrans to a recieving RemTrans set
! same value (0 - 255) in Address[0] in both sending and receiving RemTrans.
!
! @b Object graph
! @image orm_remnoderabbitmq_og.png
!
! @b See also
! @classlink RemoteConfig remote_remoteconfig.html
! @classlink RemTrans remote_remtrans.html
!*/
Object RemnodeRabbitMQ $ClassDef 35
Body SysBody
Attr Editor = pwr_eEditor_AttrEd
Attr Method = pwr_eMethod_Standard
Attr PopEditor = 2
EndBody
Object RtBody $ObjBodyDef 1
Body SysBody
Attr StructName = "RemnodeRabbitMQ"
EndBody
!/**
! Optional description.
!*/
Object Description $Attribute 1
Body SysBody
Attr TypeRef = "pwrs:Type-$String80"
EndBody
EndObject
!/**
! Process priority for the transport process. For future use.
!*/
Object Prio $Attribute 2
Body SysBody
Attr TypeRef = "pwrs:Type-$Int32"
EndBody
EndObject
!/**
! Server node or broker.
!*/
Object Server $Attribute 3
Body SysBody
Attr TypeRef = "pwrs:Type-$String80"
EndBody
EndObject
!/**
! RabbitMQ port. The default port for RabbitMQ is 5672.
!*/
Object Port $Attribute 4
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
EndBody
EndObject
!/**
! RabbitMQ channel.
!*/
Object Channel $Attribute 5
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
EndBody
EndObject
!/**
! Name of exchange. If empty, the default exchange is used.
!*/
Object Exchange $Attribute 6
Body SysBody
Attr TypeRef = "pwrs:Type-$String80"
EndBody
EndObject
!/**
! Name of send queue.
! The queue is created if it doesn't exist.
!*/
Object SendQueue $Attribute 7
Body SysBody
Attr TypeRef = "pwrs:Type-$String80"
EndBody
EndObject
!/**
! Name of receive queue.
! The queue is created if it doesn't exist.
!*/
Object ReceiveQueue $Attribute 8
Body SysBody
Attr TypeRef = "pwrs:Type-$String80"
EndBody
EndObject
!/**
! Username for broker connection.
! The user has to be created in the broker.
!*/
Object User $Attribute 9
Body SysBody
Attr TypeRef = "pwrs:Type-$String40"
Attr Flags |= PWR_MASK_CONST
EndBody
EndObject
!/**
! Password for the broker connection.
!*/
Object Password $Attribute 10
Body SysBody
Attr TypeRef = "pwrs:Type-$String40"
Attr Flags |= PWR_MASK_CONST
Attr Flags |= PWR_MASK_RTHIDE
Attr Flags |= PWR_MASK_DEVHIDEVALUE
EndBody
EndObject
!/**
! Acknowlege is required for all messages.
! Send messages will not removed from the queue unless an aknowlege is received
! from the receiver.
! For received messages an acknowlege messages is send to the broker.
! If Acknowlege is not sent, messages will be removed from the queue when they are sent.
!*/
Object Acknowledge $Attribute 11
Body SysBody
Attr TypeRef = "pwrs:Type-$Boolean"
EndBody
EndObject
!/**
! Queued messages are stored on disk by the broker, and will be recovered after a
! restart.
!*/
Object Durable $Attribute 12
Body SysBody
Attr TypeRef = "pwrs:Type-$Boolean"
EndBody
EndObject
!/**
! Scantime in seconds for outgoing RemTrans messages.
! Dynamic change is possible.
!*/
Object ScanTime $Attribute 14
Body SysBody
Attr TypeRef = "pwrs:Type-$Float32"
EndBody
EndObject
!/**
! When set, this attribute tells the remote handler not to start
! or restart the process that handles this remote node. If the transport process
! is running while the attribute is set it will terminate. This can be used to force
! a restart of the process.
!*/
Object Disable $Attribute 15
Body SysBody
Attr TypeRef = "pwrs:Type-$Boolean"
EndBody
EndObject
!/**
! This attribute shows how many times the remote handler has restarted the
! transport process that handles this remnode.
! Dynamic change is possible and can be used in order to earn more restarts.
!*/
Object RestartCount $Attribute 16
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
Attr Flags |= PWR_MASK_STATE
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_INVISIBLE
EndBody
EndObject
!/**
! The restart limit tells the remote handler how many times the transport process
! that handle this remnode can be restarted.
! Dynamic change is possible and can be used in order to earn more restarts.
!*/
Object RestartLimit $Attribute 17
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
EndBody
EndObject
!/**
! The restart time is set by the transport process at startup and therefore
! shows the latest (re)starttime.
!*/
Object RestartTime $Attribute 18
Body SysBody
Attr TypeRef = "pwrs:Type-$Time"
Attr Flags |= PWR_MASK_STATE
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_INVISIBLE
EndBody
EndObject
!/**
! When set, this attribute disables use of the special Remote header
! in each send message. Furthermore, when receiving a message, the whole part
! of the message is treated as data which means that it is not possible to
! tell which ingoing RemTrans object the message is meant for since this information
! lies in the header. Therefore each incoming message is placed in the first found
! RemTrans object.
! Dynamic change is possible.
!*/
Object DisableHeader $Attribute 19
Body SysBody
Attr TypeRef = "pwrs:Type-$Boolean"
EndBody
EndObject
!/**
! Error counter.
!*/
Object ErrCount $Attribute 20
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
Attr Flags |= PWR_MASK_STATE
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_INVISIBLE
EndBody
EndObject
!/**
! Type of remnode. Used in the RemoteConfig classgraph.
!*/
Object Id $Attribute 21
Body SysBody
Attr TypeRef = "pwrs:Type-$String8"
Attr Flags |= PWR_MASK_INVISIBLE
EndBody
EndObject
!/**
! Contains the objid for the RemTrans objects for this remnode.
! The objid's are inserted by the remote process.
!*/
Object RemTransObjects $Attribute 22
Body SysBody
Attr TypeRef = "pwrs:Type-$Objid"
Attr Flags |= PWR_MASK_STATE
Attr Flags |= PWR_MASK_INVISIBLE
Attr Flags |= PWR_MASK_ARRAY
Attr Elements = 25
EndBody
EndObject
EndObject
Object Template RemnodeRabbitMQ
Body RtBody
Attr Server = "localhost"
Attr Port = 5672
Attr Channel = 1
Attr User = "guest"
Attr Password = "guest"
Attr Prio = 15
Attr RestartLimit = 100
Attr ScanTime = 0.1
Attr Id = "RabbitM"
EndBody
EndObject
EndObject
EndSObject
...@@ -60,6 +60,7 @@ pkg_install_func () ...@@ -60,6 +60,7 @@ pkg_install_func ()
chmod a+x pkg_unpack.sh chmod a+x pkg_unpack.sh
./pkg_unpack.sh $pkg ./pkg_unpack.sh $pkg
rm ./pkg_unpack.sh
} }
pkg_list_func () pkg_list_func ()
......
...@@ -82,3 +82,4 @@ udpweirdheader <UDP message with illegal header received> /info ...@@ -82,3 +82,4 @@ udpweirdheader <UDP message with illegal header received> /info
disorder <Message disorder> /error disorder <Message disorder> /error
disabled <Transport disabled> /error disabled <Transport disabled> /error
udpnocon <No UDP connection> /error udpnocon <No UDP connection> /error
exception <Exception from connection> /error
...@@ -1737,7 +1737,7 @@ int XNav::show_remnode() ...@@ -1737,7 +1737,7 @@ int XNav::show_remnode()
strcpy( th.title[th.table_cnt++], "Description"); strcpy( th.title[th.table_cnt++], "Description");
new ItemTableHeader( brow, this, "Title", &th, NULL, flow_eDest_IntoLast); new ItemTableHeader( brow, this, "Title", &th, NULL, flow_eDest_IntoLast);
for ( int i = 0; i < 9; i++) { for ( int i = 0; i < 10; i++) {
switch ( i) { switch ( i) {
case 0: cid = pwr_cClass_RemnodeUDP; break; case 0: cid = pwr_cClass_RemnodeUDP; break;
case 1: cid = pwr_cClass_RemnodeTCP; break; case 1: cid = pwr_cClass_RemnodeTCP; break;
...@@ -1748,6 +1748,7 @@ int XNav::show_remnode() ...@@ -1748,6 +1748,7 @@ int XNav::show_remnode()
case 6: cid = pwr_cClass_RemnodeMQ; break; case 6: cid = pwr_cClass_RemnodeMQ; break;
case 7: cid = pwr_cClass_RemnodeWMQ; break; case 7: cid = pwr_cClass_RemnodeWMQ; break;
case 8: cid = pwr_cClass_RemnodeQCom; break; case 8: cid = pwr_cClass_RemnodeQCom; break;
case 9: cid = pwr_cClass_RemnodeRabbitMQ; break;
} }
sts = gdh_GetClassList( cid, &objid); sts = gdh_GetClassList( cid, &objid);
...@@ -1805,6 +1806,11 @@ int XNav::show_remnode() ...@@ -1805,6 +1806,11 @@ int XNav::show_remnode()
strncpy( description, ((pwr_sClass_RemnodeQCom *)object_ptr)->Description, strncpy( description, ((pwr_sClass_RemnodeQCom *)object_ptr)->Description,
sizeof(description)); sizeof(description));
break; break;
case 9:
strncpy( id, ((pwr_sClass_RemnodeRabbitMQ *)object_ptr)->Id, sizeof(id));
strncpy( description, ((pwr_sClass_RemnodeRabbitMQ *)object_ptr)->Description,
sizeof(description));
break;
} }
t.elem_cnt = 0; t.elem_cnt = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment