Commit 74130376 authored by Robert Karlsson's avatar Robert Karlsson

Added remote handling for Webshpere MQ

parent e50fa1a4
include $(pwre_dir_symbols)
-include $(pwre_kroot)/tools/bld/src/$(os_name)/$(hw_name)/$(type_name)_generic.mk
ifeq ($($(type_name)_generic_mk),)
-include $(pwre_kroot)/tools/bld/src/$(os_name)/$(type_name)_generic.mk
endif
ifeq ($($(type_name)_generic_mk),)
include $(pwre_kroot)/tools/bld/src/$(type_name)_generic.mk
endif
ifndef link_rule_mk
link_rule_mk := 1
ifeq ($(PWRE_CONF_WMQ),1)
link = $(ldxx) $(elinkflags) $(domap) -o $(export_exe) \
$(export_obj) $(objects) $(rt_msg_eobjs) \
$(pwre_conf_libdir) $(pwre_conf_libpwrremote) $(pwre_conf_libpwrrt) $(pwre_conf_lib)
else
link = echo "WMQ not installed"
endif
endif
/*
* Proview $Id: rs_remote_mq.c,v 1.3 2006-04-24 13:22:23 claes Exp $
* Copyright (C) 2005 SSAB Oxelsund AB.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 2 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with the program, if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
/*************************************************************************
* ===============
* P r o v i e w
* ===============
**************************************************************************
*
* Filename: rs_remote_wmq.c
*
* Description: Remote transport process for Websphere Message Queue
* as a client
* For further information, please refer to Webspherer MQ
* documentation.
*
* Change log: 2010-12-08, Robert Karlsson
* First version introduced in 4.8.0-2
*
*
**************************************************************************
**************************************************************************/
/*_Include files_________________________________________________________*/
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include "pwr_class.h"
#include "pwr_systemclasses.h"
#include "rt_gdh.h"
#include "co_time.h"
#include "co_cdh.h"
#include "rt_errh.h"
#include "pwr_baseclasses.h"
#include "pwr_remoteclasses.h"
#include "rt_pwr_msg.h"
#include "rt_aproc.h"
#include "remote.h"
#include "remote_remtrans_utils.h"
// Message Q include files
#include <cmqc.h>
#define TIME_INCR 0.02
#define debug 0
remnode_item rn;
pwr_sClass_RemnodeWMQ *rn_wmq;
char mgr_name[MQ_Q_MGR_NAME_LENGTH];
MQHCONN Hconn;
MQOD RcvObjDesc = {MQOD_DEFAULT};
MQLONG RcvOpenOptions; // options that control the open-call
MQOD SndObjDesc = {MQOD_DEFAULT};
MQLONG SndOpenOptions; // options that control the open-call
char rcv_que_name[MQ_Q_NAME_LENGTH];
char snd_que_name[MQ_Q_NAME_LENGTH];
MQHOBJ RcvHobj; // object handle
MQHOBJ SndHobj; // object handle
/*************************************************************************
**************************************************************************
*
* RemoteSleep
*
**************************************************************************
**************************************************************************/
void RemoteSleep(float time)
{
struct timespec rqtp, rmtp;
rqtp.tv_sec = 0;
rqtp.tv_nsec = (long int) (time * 1000000000);
nanosleep(&rqtp, &rmtp);
return;
}
/*************************************************************************
**************************************************************************
*
* Namn : wmq_receive
*
* Typ : unsigned int
*
* Typ Parameter IOGF Beskrivning
*
* Beskrivning : Invoked when a MQ message is received.
*
**************************************************************************
**************************************************************************/
unsigned int wmq_receive()
{
MQLONG CompCode;
MQLONG Reason;
MQMD MsgDesc = {MQMD_DEFAULT};
MQLONG DataLength;
MQCHAR Buffer[16384];
MQGMO GetMsgOpts = {MQGMO_DEFAULT};
MQLONG BufferLength = sizeof(Buffer);
unsigned int sts;
char search_remtrans;
remtrans_item *remtrans;
/* Set options */
GetMsgOpts.Options = MQGMO_NO_WAIT + MQGMO_ACCEPT_TRUNCATED_MSG;
MsgDesc.Encoding = MQENC_NATIVE;
MsgDesc.CodedCharSetId = MQCCSI_Q_MGR;
/* Get message */
MQGET(Hconn, RcvHobj, &MsgDesc, &GetMsgOpts, BufferLength, Buffer, &DataLength, &CompCode, &Reason);
if (CompCode != MQCC_FAILED) {
if (debug) printf("Received message %d\n", (int) DataLength);
search_remtrans = true;
remtrans = rn.remtrans;
while(remtrans && search_remtrans) {
if ((strncmp(remtrans->objp->TransName, (char *) MsgDesc.MsgId, MQ_MSG_ID_LENGTH) == 0) &&
(remtrans->objp->Direction == REMTRANS_IN)) {
search_remtrans = false;
sts = RemTrans_Receive(remtrans, (char *) &Buffer, DataLength);
if (sts != STATUS_OK && sts != STATUS_BUFF)
errh_Error("Error from RemTrans_Receive, status %d", sts, 0);
break;
}
remtrans = (remtrans_item *) remtrans->next;
}
if (search_remtrans) {
rn_wmq->ErrCount++;
errh_Info("No remtrans for received message, msgid %s", MsgDesc.MsgId, 0);
}
}
else if (Reason != MQRC_NO_MSG_AVAILABLE) {
rn_wmq->ErrCount++;
errh_Error("Receive failed, reason %d", Reason, 0);
}
return(sts);
}
/*************************************************************************
**************************************************************************
*
* Namn : wmq_send
*
* Typ : unsigned int
*
* Typ Parameter IOGF Beskrivning
*
* Beskrivning : Sends a MQ message to Remote node
*
**************************************************************************
**************************************************************************/
unsigned int wmq_send(remnode_item *remnode,
pwr_sClass_RemTrans *remtrans,
char *buf,
int buf_size)
{
MQLONG CompCode;
MQLONG Reason;
MQPMO pmo = {MQPMO_DEFAULT}; /* put message options */
// MQOD od = {MQOD_DEFAULT}; /* Object Descriptor */
MQMD md = {MQMD_DEFAULT}; /* Message Descriptor */
MQLONG messlen; /* message length */
pmo.Options = MQPMO_NO_SYNCPOINT | MQPMO_FAIL_IF_QUIESCING;
// pmo.Options |= MQPMO_NEW_MSG_ID;
// pmo.Options |= MQPMO_NEW_CORREL_ID;
strncpy((char *) md.MsgId, remtrans->TransName, MQ_MSG_ID_LENGTH) ;
memcpy(md.CorrelId, MQCI_NONE, sizeof(md.CorrelId));
if ((remtrans->Address[0] <= MQPER_PERSISTENCE_AS_Q_DEF) &&
(remtrans->Address[0] >= MQPER_NOT_PERSISTENT))
md.Persistence = remtrans->Address[0];
else
md.Persistence = MQPER_NOT_PERSISTENT; // | MQPRE_NOT_PERSISTENT
messlen = buf_size;
MQPUT(Hconn, /* connection handle */
SndHobj, /* object handle */
&md, /* message descriptor */
&pmo, /* default options (datagram) */
messlen, /* message length */
buf, /* message buffer */
&CompCode, /* completion code */
&Reason); /* reason code */
/* report reason, if any */
if (Reason != MQRC_NONE) {
remtrans->ErrCount++;
errh_Error("Send failed, msgid %s, Reason %d", md.MsgId, Reason, 0);
printf("MQPUT ended with reason code %d\n", (int) Reason);
}
// if (debug) printf("Sent message %d\n", (int) mq_sts);
return( STATUS_OK );
}
/*************************************************************************
**************************************************************************
*
* Main
*
**************************************************************************
**************************************************************************/
int main(int argc, char *argv[])
{
remtrans_item *remtrans;
unsigned char id[32];
unsigned char pname[32];
pwr_tStatus sts;
int i;
float time_since_scan = 0.0;
MQLONG CompCode;
MQLONG Reason;
/* Read arg number 2, should be id for this instance and id is our queue number */
if (argc >= 2)
strcpy((char *)id, argv[1]);
else
strcpy((char *)id, "0");
/* Build process name with id */
sprintf((char *) pname, "rs_remwmq_%s", id);
/* Init of errh */
errh_Init((char *) pname, errh_eAnix_remote);
errh_SetStatus(PWR__SRVSTARTUP);
/* Init of gdh */
if (debug) printf("Before gdh_init\n");
sts = gdh_Init((char *) pname);
if ( EVEN(sts)) {
errh_Fatal("gdh_Init, %m", sts);
errh_SetStatus(PWR__SRVTERM);
exit(sts);
}
/* Arg number 3 should be my remnodes objid in string representation,
read it, convert to real objid and store in remnode_item */
sts = 0;
if (argc >= 3) sts = cdh_StringToObjid(argv[2], &rn.objid);
if ( EVEN(sts)) {
errh_Fatal("cdh_StringToObjid, %m", sts);
errh_SetStatus(PWR__SRVTERM);
exit(sts);
}
/* Get pointer to RemnodeWMQ object and store locally */
sts = gdh_ObjidToPointer(rn.objid, (pwr_tAddress *) &rn_wmq);
if ( EVEN(sts)) {
errh_Fatal("cdh_ObjidToPointer, %m", sts);
errh_SetStatus(PWR__SRVTERM);
exit(sts);
}
/* Initialize some internal data and make standard remtrans init */
rn.next = NULL;
rn.local = NULL; // We dont use local structure since we only have one remnode
rn_wmq->ErrCount = 0;
if (debug) printf("Before remtrans_init\n");
sts = RemTrans_Init(&rn);
if ( EVEN(sts)) {
errh_Fatal("RemTrans_Init, %m", sts);
errh_SetStatus(PWR__SRVTERM);
exit(sts);
}
/* Store remtrans objects objid in remnode_mq object */
remtrans = rn.remtrans;
i = 0;
while(remtrans) {
rn_wmq->RemTransObjects[i++] = remtrans->objid;
if ( i >= (int)(sizeof(rn_wmq->RemTransObjects) / sizeof(rn_wmq->RemTransObjects[0])))
break;
remtrans = (remtrans_item *) remtrans->next;
}
/* Variables for MQ calls */
strncpy(mgr_name, rn_wmq->QueueManager, MQ_Q_MGR_NAME_LENGTH);
// strncpy(mgr_name, "hejsanqqq", sizeof(MQ_Q_MGR_NAME_LENGTH));
/* Connect to specified queue manager */
MQCONN(mgr_name, &Hconn, &CompCode, &Reason);
if ((CompCode != MQCC_OK) | (Reason != MQRC_NONE)) {
errh_Fatal("MQCONN failed, queue mgr: %s, Code: %d, Reason: %d", mgr_name, CompCode, Reason);
errh_SetStatus(PWR__SRVTERM);
exit(0);
}
/* Open queue for receiving messages */
strncpy(rcv_que_name, rn_wmq->RcvQueue, MQ_Q_NAME_LENGTH);
// strncpy(rcv_que_name, "hejsanqqq", sizeof(MQ_Q_NAME_LENGTH));
/* Initialize object descriptor control block */
strncpy(RcvObjDesc.ObjectName, rcv_que_name, MQ_Q_NAME_LENGTH);
/* open queue for input but not if MQM stopping */
RcvOpenOptions = MQOO_INPUT_AS_Q_DEF | MQOO_FAIL_IF_QUIESCING;
/* Open queue */
MQOPEN(Hconn, &RcvObjDesc, RcvOpenOptions, &RcvHobj, &CompCode, &Reason);
if ((CompCode != MQCC_OK) | (Reason != MQRC_NONE)) {
errh_Fatal("MQOPEN failed, queue: %s, Code: %d, Reason: %d", rcv_que_name, CompCode, Reason);
errh_SetStatus(PWR__SRVTERM);
exit(0);
}
/* Open queue for sending messages */
strncpy(snd_que_name, rn_wmq->SndQueue, MQ_Q_NAME_LENGTH);
// strncpy(snd_que_name, "hejsanqqq", sizeof(MQ_Q_NAME_LENGTH));
/* Initialize object descriptor control block */
strncpy(SndObjDesc.ObjectName, snd_que_name, MQ_Q_NAME_LENGTH);
/* open queue for output but not if MQM stopping */
SndOpenOptions = MQOO_OUTPUT | MQOO_FAIL_IF_QUIESCING;
MQOPEN(Hconn, &SndObjDesc, SndOpenOptions, &SndHobj, &CompCode, &Reason);
if ((CompCode != MQCC_OK) | (Reason != MQRC_NONE)) {
errh_Fatal("MQOPEN failed, queue: %s, Code: %d, Reason: %d", snd_que_name, CompCode, Reason);
errh_SetStatus(PWR__SRVTERM);
exit(0);
}
errh_SetStatus(PWR__SRUN);
/* Set (re)start time in remnode object */
time_GetTime(&rn_wmq->RestartTime);
/* Loop forever */
while (!doomsday) {
if (rn_wmq->Disable == 1) {
errh_Fatal("Disabled, exiting");
errh_SetStatus(PWR__SRVTERM);
exit(0);
}
aproc_TimeStamp(TIME_INCR, 5);
RemoteSleep(TIME_INCR);
time_since_scan += TIME_INCR;
sts = wmq_receive();
if (time_since_scan >= rn_wmq->ScanTime) {
sts = RemTrans_Cyclic(&rn, &wmq_send);
time_since_scan = 0.0;
}
}
}
......@@ -43,6 +43,7 @@
* 040303 J Nylund ndrat till omgivningsvariabel
* i skvgarna till exe-filerna
* 040422 C Jurstrand 4.0.0
* 101209 R Karlsson Adderat std fr Websphere MQ
*
* Description:
* Start and control of transportprocesses for remote communication
......@@ -111,6 +112,7 @@ static void AddTransports()
pwr_tObjid objid;
pwr_tAddress objref;
pwr_tStatus sts;
pwr_tUInt32 id = 1;
/* Init transport (process) counter */
tpcount = 0;
......@@ -118,6 +120,30 @@ static void AddTransports()
/* Initialize transport vector with 0's */
memset(&tp, 0, sizeof(tp));
/* Get and configure all WMQ remnodes, one process for each remnode */
sts = gdh_GetClassList (pwr_cClass_RemnodeWMQ, &objid);
while ( ODD(sts))
{
sts = gdh_ObjidToPointer(objid, &objref);
sprintf(tp[tpcount].path, "rs_remote_wmq");
tp[tpcount].id = id++;
tp[tpcount].disable = &((pwr_sClass_RemnodeWMQ *) objref)->Disable;
tp[tpcount].restart_limit = &((pwr_sClass_RemnodeWMQ *) objref)->RestartLimit;
tp[tpcount].restarts = &((pwr_sClass_RemnodeWMQ *) objref)->RestartCount;
((pwr_sClass_RemnodeWMQ *) objref)->RestartCount = 0;
tp[tpcount].objid = objid;
tp[tpcount].objref = objref;
tp[tpcount].classid = pwr_cClass_RemnodeWMQ;
tp[tpcount].cpid = -1;
tp[tpcount].first = true;
remcfgp->RemNodeObjects[tpcount] = objid;
tpcount++;
sts = gdh_GetNextObject (objid, &objid);
}
/* Get and configure all MQ remnodes, one process for each remnode */
sts = gdh_GetClassList (pwr_cClass_RemnodeMQ, &objid);
......
!
! Proview $Id: remote_c_remnodemq.wb_load,v 1.1 2006-01-12 06:39:33 claes Exp $
! Copyright (C) 2005 SSAB Oxelsund AB.
!
! This program is free software; you can redistribute it and/or
! modify it under the terms of the GNU General Public License as
! published by the Free Software Foundation, either version 2 of
! the License, or (at your option) any later version.
!
! This program is distributed in the hope that it will be useful
! but WITHOUT ANY WARRANTY; without even the implied warranty of
! MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
! GNU General Public License for more details.
!
! You should have received a copy of the GNU General Public License
! along with the program, if not, write to the Free Software
! Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
!
! remote_c_remnodewmq.wb_load -- Defines the class RemnodeWMQ.
!
SObject Remote:Class
!/**
! @Version 1.0
! @Group Servers,NodeConfiguration
! @Summary Configures communication through a message queue using Webspere MQ.
! Configures communication through a message queue using Websphere MQ as a client.
! All regarding Server, channel, queue manager and queues to connect to is configured
! in the RemonodeWMQ-object.
! Configurations for the different messages is configured in the RemtTrans-object that define
! each in- and outgoing message. For each RemTrans-message the following can be configured:
!
! TransName - a string that defines the MsgId (message identity of the message).
! Address[0] - defines whether the message should be sent as a persistent message or not.
! 1 means that the message will be sent as a persistent message. 0 not.
!
! @b Object graph
! @image orm_remnodewmq_og.gif
!
! @b See also
! @classlink RemoteConfig pwrp_remoteconfig.html
! @classlink RemTrans ssab_remtrans.html
!*/
Object RemnodeWMQ $ClassDef 33
Body SysBody
Attr Editor = pwr_eEditor_AttrEd
Attr Method = pwr_eMethod_Standard
Attr PopEditor = 2
EndBody
Object RtBody $ObjBodyDef 1
Body SysBody
Attr StructName = "RemnodeWMQ"
EndBody
!/**
! Optional description.
!*/
Object Description $Attribute 1
Body SysBody
Attr TypeRef = "pwrs:Type-$String80"
EndBody
EndObject
!/**
! Process priority for the transport process. For future use.
!*/
Object Prio $Attribute 2
Body SysBody
Attr TypeRef = "pwrs:Type-$Int32"
EndBody
EndObject
!/**
! Name of Queue Manager to connect to.
! Server to connect to is defined by enviroment variable MQSERVER.
! export MQSERVER=<ChannelName>/<TransportType>/<ConnectionName>[(port)]
! For example:
! export MQSERVER=CHANNEL1/TCP/192.168.40.20(2020)
! The port is by default 1414 and will be used if no port is given.
!*/
Object QueueManager $Attribute 3
Body SysBody
Attr TypeRef = "pwrs:Type-$String80"
EndBody
EndObject
!/**
! Name of WMQ queue for all incoming messages.
!*/
Object RcvQueue $Attribute 4
Body SysBody
Attr TypeRef = "pwrs:Type-$String80"
EndBody
EndObject
!/**
! Name of WMQ queue for all outgoing messages.
!*/
Object SndQueue $Attribute 5
Body SysBody
Attr TypeRef = "pwrs:Type-$String80"
EndBody
EndObject
!/**
! Scantime in seconds for outgoing RemTrans messages.
! Dynamic change is possible.
!*/
Object ScanTime $Attribute 6
Body SysBody
Attr TypeRef = "pwrs:Type-$Float32"
EndBody
EndObject
!/**
! When set, this attribute tells the remote handler not to start
! or restart the process that handles this remote node. If the transport process
! is running while the attribute is set it will terminate. This can be used to force
! a restart of the process.
!*/
Object Disable $Attribute 7
Body SysBody
Attr TypeRef = "pwrs:Type-$Boolean"
EndBody
EndObject
!/**
! This attribute shows how many times the remote handler has restarted the
! transport process that handles this remnode.
! Dynamic change is possible and can be used in order to earn more restarts.
!*/
Object RestartCount $Attribute 8
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
Attr Flags |= PWR_MASK_STATE
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_INVISIBLE
EndBody
EndObject
!/**
! The restart limit tells the remote handler how many times the transport process
! that handle this remnode can be restarted.
! Dynamic change is possible and can be used in order to earn more restarts.
!*/
Object RestartLimit $Attribute 9
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
EndBody
EndObject
!/**
! The restart time is set by the transport process at startup and therefore
! shows the latest (re)starttime.
!*/
Object RestartTime $Attribute 10
Body SysBody
Attr TypeRef = "pwrs:Type-$Time"
Attr Flags |= PWR_MASK_STATE
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_INVISIBLE
EndBody
EndObject
!/**
! Error counter.
!*/
Object ErrCount $Attribute 11
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
Attr Flags |= PWR_MASK_STATE
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_INVISIBLE
EndBody
EndObject
!/**
! Type of remnode. Used in the RemoteConfig classgraph.
!*/
Object Id $Attribute 12
Body SysBody
Attr TypeRef = "pwrs:Type-$String8"
Attr Flags |= PWR_MASK_INVISIBLE
EndBody
EndObject
!/**
! Contains the objid for the RemTrans objects for this remnode.
! The objid's are inserted by the remote process.
!*/
Object RemTransObjects $Attribute 13
Body SysBody
Attr TypeRef = "pwrs:Type-$Objid"
Attr Flags |= PWR_MASK_STATE
Attr Flags |= PWR_MASK_INVISIBLE
Attr Flags |= PWR_MASK_ARRAY
Attr Elements = 25
EndBody
EndObject
EndObject
Object Template RemnodeWMQ
Body RtBody
Attr Prio = 15
Attr Disable = 0
Attr RestartCount = 0
Attr RestartLimit = 100
Attr ScanTime = 0.1
Attr Id = "WMQ"
EndBody
EndObject
EndObject
EndSObject
......@@ -289,6 +289,7 @@ else
pwre_config_check_lib libcrypt LIBCRYPT lib lib 0 /usr/lib/libcrypt.so:/usr/lib/libcrypt.a
pwre_config_check_lib mysql MYSQL lib lib 1 /usr/lib/libmysqlclient.so:/usr/lib/mysql/libmysqlclient.so
pwre_config_check_lib mq MQ lib lib 1 /usr/lib/libdmq.so
pwre_config_check_lib wmq WMQ lib lib 1 /usr/lib/libmqic.so
pwre_config_check_lib libpnioif PNAK lib lib 1 /usr/lib/libpnioif.a:/usr/local/lib/libpnioif.a
pwre_config_check_lib libusb LIBUSB lib lib 1 /usr/lib/libusb-1.0.so
pwre_config_check_lib librt LIBRT lib lib 0 /usr/lib/librt.so:/usr/lib/librt.a
......@@ -301,6 +302,7 @@ else
pwre_config_check_include gtk GTK 1 /usr/local/include/gtk-2.0/gtk.h:/usr/local/include/gtk-2.0/gtk/gtk.h:/usr/include/gtk-2.0/gtk/gtk.h
pwre_config_check_include jni JNI 1 $jdk/include/jni.h
pwre_config_check_include jni JNI 0 $jdk/include/linux/jni_md.h
pwre_config_check_include wmq WMQ 0 /opt/mqm/inc/cmqc.h
export pwre_conf_alsa=1
......
......@@ -182,7 +182,7 @@ palette NavigatorPalette
menu Siemens
{
class Siemens_Motor_1LA
class Siemens_G120_Tgm1
class Sinamics_G120_Tgm1
}
menu Suco
{
......@@ -338,6 +338,7 @@ palette NavigatorPalette
class RemnodeALCM
class RemnodeModbus
class RemnodeMQ
class RemnodeWMQ
class RemnodeSerial
class RemnodeTCP
class RemnodeUDP
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment