Commit 1b910a70 authored by Claes Sjofors's avatar Claes Sjofors

Sev server threads

parent 56e6ae1b
......@@ -87,6 +87,7 @@ int sev_server::init( int noneth)
pwr_tStatus sts;
qcom_sAid aid;
qcom_sQid qini;
sev_sDbConfig db_config;
m_server_status = PWR__SRVSTARTUP;
......@@ -96,6 +97,7 @@ int sev_server::init( int noneth)
errh_Init( "sev_server", (errh_eAnix)0);
m_noneth = noneth;
if (!m_noneth) {
// Check server config object
pwr_tOid conf_oid;
......@@ -111,33 +113,85 @@ int sev_server::init( int noneth)
pwr_sAttrRef aref = cdh_ObjidToAref( conf_oid);
sts = gdh_DLRefObjectInfoAttrref( &aref, (void **)&m_config, &m_config_dlid);
if ( EVEN(sts))
m_config = 0;
// Get configured database
pwr_tAttrRef daref;
pwr_eSevDatabaseEnum db_enum;
sts = gdh_ArefANameToAref( &aref, "Database", &daref);
if ( ODD(sts)) {
sts = gdh_GetObjectInfoAttrref( &daref, (void *)&db_enum, sizeof(db_enum));
if ( ODD(sts)) {
switch ( db_enum) {
case pwr_eSevDatabaseEnum_MySQL:
set_dbtype( sev_eDbType_Mysql);
break;
case pwr_eSevDatabaseEnum_SQLite:
set_dbtype( sev_eDbType_Sqlite);
break;
case pwr_eSevDatabaseEnum_HDF5:
set_dbtype( sev_eDbType_HDF5);
break;
}
}
if ( EVEN(sts)) {
errh_CErrLog( PWR__SRVNOTCONF, 0);
exit(0);
}
}
else {
// Read config from proview.cnf
static pwr_sClass_SevServer config;
char str[80];
float fvalue;
memset( &config, 0, sizeof(config));
m_config = &config;
m_config_dlid = pwr_cNDlid;
if ( cnf_get_value( "sevDatabaseType", str, sizeof(str))) {
if ( cdh_NoCaseStrcmp( str, "sqlite") == 0)
m_config->Database = sev_eDbType_Sqlite;
else if ( cdh_NoCaseStrcmp( str, "mysql") == 0)
m_config->Database = sev_eDbType_Mysql;
else if ( cdh_NoCaseStrcmp( str, "hdf5") == 0)
m_config->Database = sev_eDbType_HDF5;
else
m_config->Database = sev_eDbType_Mysql;
}
else
m_config->Database = sev_eDbType_Mysql;
if ( cnf_get_value( "sevUseServerThreads", str, sizeof(str))) {
if ( cdh_NoCaseStrcmp( str, "1") == 0)
m_config->UseServerThreads = 1;
}
if ( cnf_get_value( "sevLinearRegrAll", str, sizeof(str))) {
if ( cdh_NoCaseStrcmp( str, "1") == 0)
m_config->LinearRegrAll = 1;
}
if ( cnf_get_value( "sevMeanValueAll", str, sizeof(str))) {
if ( cdh_NoCaseStrcmp( str, "1") == 0)
m_config->MeanValueAll = 1;
}
if ( cnf_get_value( "sevMeanValueInterval1", str, sizeof(str))) {
if ( sscanf( str, "%f", &fvalue) == 1)
m_config->MeanValueInterval1 = fvalue;
}
if ( cnf_get_value( "sevMeanValueInterval2", str, sizeof(str))) {
if ( sscanf( str, "%f", &fvalue) == 1)
m_config->MeanValueInterval2 = fvalue;
}
if ( cnf_get_value( "sevLinearRegrMaxTime", str, sizeof(str))) {
if ( sscanf( str, "%f", &fvalue) == 1)
m_config->LinearRegrMaxTime = fvalue;
}
}
switch ( m_config->Database) {
case pwr_eSevDatabaseEnum_MySQL:
set_dbtype( sev_eDbType_Mysql);
break;
case pwr_eSevDatabaseEnum_SQLite:
set_dbtype( sev_eDbType_Sqlite);
break;
case pwr_eSevDatabaseEnum_HDF5:
set_dbtype( sev_eDbType_HDF5);
break;
default:
set_dbtype( sev_eDbType_Mysql);
}
m_db = sev_db::open_database( m_db_type);
memset( &db_config, 0, sizeof(db_config));
db_config.LinearRegrMaxTime = m_config->LinearRegrMaxTime;
db_config.LinearRegrAll = m_config->LinearRegrAll;
db_config.MeanValueAll = m_config->MeanValueAll;
db_config.MeanValueInterval1 = m_config->MeanValueInterval1;
db_config.MeanValueInterval2 = m_config->MeanValueInterval2;
cnf_get_value( "sevMysqlEngine", db_config.Engine, sizeof(db_config.Engine));
cnf_get_value( "mysqlSocket", db_config.Socket, sizeof(db_config.Socket));
m_db = sev_db::open_database( m_db_type, &db_config);
if ( !m_db) {
errh_Fatal( "Database open error");
exit(0);
......@@ -222,6 +276,7 @@ int sev_server::connect()
msg = (sev_sMsgAny *) qcom_Alloc(&lsts, put.size);
msg->Type = sev_eMsgType_NodeUp;
msg->Version = sev_cNetVersion;
put.data = msg;
put.allocate = 0;
......@@ -257,6 +312,7 @@ int sev_server::request_items( pwr_tNid nid)
msg = (sev_sMsgAny *) qcom_Alloc(&lsts, put.size);
msg->Type = sev_eMsgType_HistItemsRequest;
msg->Version = sev_cNetVersion;
put.data = msg;
put.allocate = 0;
......@@ -295,6 +351,7 @@ int sev_server::send_itemlist( qcom_sQid tgt)
put.allocate = 0;
((sev_sMsgHistItems *)put.data)->Type = sev_eMsgType_HistItems;
((sev_sMsgHistItems *)put.data)->Version = sev_cNetVersion;
((sev_sMsgHistItems *)put.data)->NumItems = item_cnt;
((sev_sMsgHistItems *)put.data)->NumAttributes = itemattr_cnt;
......@@ -351,6 +408,7 @@ int sev_server::send_server_status( qcom_sQid tgt)
put.allocate = 0;
((sev_sMsgServerStatus *)put.data)->Type = sev_eMsgType_ServerStatus;
((sev_sMsgServerStatus *)put.data)->Version = sev_cNetVersion;
sts = m_server_status;
......@@ -389,6 +447,7 @@ int sev_server::delete_item( qcom_sQid tgt, sev_sMsgHistItemDelete *rmsg)
m_db->delete_item( &sts, rmsg->Oid, rmsg->AName);
((sev_sMsgHistItemStatus *)put.data)->Type = sev_eMsgType_HistItemStatus;
((sev_sMsgHistItemStatus *)put.data)->Version = sev_cNetVersion;
((sev_sMsgHistItemStatus *)put.data)->Oid = rmsg->Oid;
strcpy( ((sev_sMsgHistItemStatus *)put.data)->AName, rmsg->AName);
......@@ -460,12 +519,12 @@ int sev_server::mainloop()
m_stat.medium_storage_rate = a * m_stat.medium_storage_rate + (1.0-a) * m_stat.storage_rate;
m_storage_cnt = 0;
m_db->store_stat( &m_stat);
if ( m_config) {
m_config->Stat.CurrentLoad = m_stat.current_load;
m_config->Stat.MediumLoad = m_stat.medium_load;
m_config->Stat.StorageRate = m_stat.storage_rate;
m_config->Stat.MediumStorageRate = m_stat.medium_storage_rate;
}
m_config->Stat.CurrentLoad = m_stat.current_load;
m_config->Stat.MediumLoad = m_stat.medium_load;
m_config->Stat.StorageRate = m_stat.storage_rate;
m_config->Stat.MediumStorageRate = m_stat.medium_storage_rate;
time_Aadd( &next_stat, &next_stat, &stat_interval);
busy = pwr_cNDeltaTime;
idle = pwr_cNDeltaTime;
......@@ -534,12 +593,10 @@ int sev_server::mainloop()
qcom_Free( &sts, mp);
if ( m_config) {
m_config->Stat.DataStoreMsgCnt = m_stat.datastore_msg_cnt;
m_config->Stat.DataGetMsgCnt = m_stat.dataget_msg_cnt;
m_config->Stat.ItemsMsgCnt = m_stat.items_msg_cnt;
m_config->Stat.EventStoreMsgCnt = m_stat.eventstore_msg_cnt;
}
m_config->Stat.DataStoreMsgCnt = m_stat.datastore_msg_cnt;
m_config->Stat.DataGetMsgCnt = m_stat.dataget_msg_cnt;
m_config->Stat.ItemsMsgCnt = m_stat.items_msg_cnt;
m_config->Stat.EventStoreMsgCnt = m_stat.eventstore_msg_cnt;
}
}
......@@ -765,31 +822,74 @@ int sev_server::check_histitems( sev_sMsgHistItems *msg, unsigned int size)
int sev_server::receive_histdata( sev_sMsgHistDataStore *msg, unsigned int size)
{
pwr_tStatus sts;
sev_sHistData *dp = (sev_sHistData *)&msg->Data;
sev_sHistData *dp;
pwr_tTime time;
pwr_tUInt32 server_thread;
m_db->begin_transaction();
if ( msg->Version == 0) {
// Server thread was added in version 1
dp = (sev_sHistData *) &((sev_sMsgHistDataStoreV0 *)msg)->Data;
server_thread = 0;
}
else {
dp = (sev_sHistData *) &msg->Data;
server_thread = msg->ServerThread;
}
while ( (char *)dp - (char *)msg < (int)size) {
sev_sRefid *rp;
pwr_tRefId rk = dp->sevid;
if ( !m_config->UseServerThreads) {
m_db->begin_transaction( 0);
rp = (sev_sRefid *) tree_Find(&sts, m_refid, &rk);
if ( !rp) {
while ( (char *)dp - (char *)msg < (int)size) {
sev_sRefid *rp;
pwr_tRefId rk = dp->sevid;
rp = (sev_sRefid *) tree_Find(&sts, m_refid, &rk);
if ( !rp) {
dp = (sev_sHistData *)((char *)dp + sizeof( *dp) - sizeof(dp->data) + dp->size);
continue;
}
unsigned int idx = rp->idx;
time = net_NetTimeToTime( &msg->Time);
m_db->store_value( &m_sts, 0, idx, 0, time, &dp->data, dp->size);
m_storage_cnt++;
dp = (sev_sHistData *)((char *)dp + sizeof( *dp) - sizeof(dp->data) + dp->size);
continue;
}
unsigned int idx = rp->idx;
time = net_NetTimeToTime( &msg->Time);
m_db->store_value( &m_sts, idx, 0, time, &dp->data, dp->size);
m_storage_cnt++;
dp = (sev_sHistData *)((char *)dp + sizeof( *dp) - sizeof(dp->data) + dp->size);
m_db->commit_transaction( 0);
}
else {
sev_sThread *th;
sev_sReceiveHistDataMsg *qmsg;
pwr_tUInt32 key;
m_db->commit_transaction();
// if ( key == nodeid)
key = server_thread;
th = find_thread( key);
if ( !th) {
th = create_thread( key);
printf( "sev_server, new thread %d\n", key);
}
// Create a queue message
if ( th->alloc + sizeof(*qmsg) - sizeof(qmsg->data) + size > m_config->ThreadQueueLimit) {
// Queue maxlimit exceede, discard message
m_config->ServerThreads[th->conf_idx].LostCnt++;
return 1;
}
qmsg = (sev_sReceiveHistDataMsg *)malloc( sizeof(*qmsg) - sizeof(qmsg->data) + size);
memcpy( &qmsg->data, dp, size);
qmsg->size = size;
qmsg->time = msg->Time;
th->alloc += qmsg->size;
if ( th->conf_idx >= 0)
m_config->ServerThreads[th->conf_idx].QueueAlloc = th->alloc;
que_Put( &sts, &th->queue, &qmsg->e, qmsg);
}
return 1;
}
......@@ -866,6 +966,7 @@ void *sev_server::send_histdata_thread( void *arg)
put.allocate = 0;
msg->Type = sev_eMsgType_HistDataGet;
msg->Version = sev_cNetVersion;
msg->Oid = rmsg->Oid;
strncpy( msg->AName, rmsg->AName, sizeof(msg->AName));
if ( ODD(sts)) {
......@@ -927,6 +1028,7 @@ int sev_server::send_objecthistdata( qcom_sQid tgt, sev_sMsgHistDataGetRequest *
put.allocate = 0;
msg->Type = sev_eMsgType_HistObjectDataGet;
msg->Version = sev_cNetVersion;
msg->Oid = rmsg->Oid;
strncpy( msg->AName, rmsg->AName, sizeof(msg->AName));
msg->Status = m_sts;
......@@ -1055,42 +1157,122 @@ void sev_server::garbage_item( int idx)
}
}
int main (int argc, char *argv[])
sev_sThread *sev_server::find_thread( int key)
{
sev_server srv;
int noneth = 0;
sev_eDbType dbtype = sev_eDbType_;
char str[80];
return m_thread_list[key];
}
if ( argc > 1 && strcmp( argv[1], "-n") == 0)
noneth = 1;
if ( argc > 2 + noneth && strcmp( argv[1+noneth], "-d") == 0 && strcmp( argv[2+noneth], "sqlite") == 0)
dbtype = sev_eDbType_Sqlite;
else if ( argc > 2 + noneth && strcmp( argv[1+noneth], "-d") == 0 && strcmp( argv[2+noneth], "mysql") == 0)
dbtype = sev_eDbType_Mysql;
else if ( argc > 2 + noneth && strcmp( argv[1+noneth], "-d") == 0 && strcmp( argv[2+noneth], "hdf5") == 0)
dbtype = sev_eDbType_HDF5;
if ( dbtype == sev_eDbType_) {
char type[80];
if ( cnf_get_value( "sevDatabaseType", type, sizeof(type))) {
if ( cdh_NoCaseStrcmp( type, "sqlite") == 0)
dbtype = sev_eDbType_Sqlite;
else if ( cdh_NoCaseStrcmp( type, "mysql") == 0)
dbtype = sev_eDbType_Mysql;
else if ( cdh_NoCaseStrcmp( type, "hdf5") == 0)
dbtype = sev_eDbType_HDF5;
void *sev_server::receive_histdata_thread( void *arg)
{
sev_server *sev = (sev_server *)((sev_sReceiveHistDataThread *)arg)->ctx;
int tmo_item;
pwr_tDeltaTime tmo = {1, 0};
sev_sReceiveHistDataMsg *msg;
sev_sThread *th = ((sev_sReceiveHistDataThread *)arg)->th;
sev_sHistData *dp;
pwr_tStatus sts;
pwr_tTime time;
free( arg);
printf( "New thread %d\n", th->key);
while ( 1) {
msg = (sev_sReceiveHistDataMsg *)que_Get( NULL, &th->queue, &tmo, &tmo_item);
if ( (int *)msg == &tmo_item) {
// printf( "Tmo %d\n", th->key);
}
else {
dp = (sev_sHistData *) &msg->data;
sev->m_db->begin_transaction( th->db_ctx);
while ( (char *)dp - (char *)msg < (int)msg->size) {
sev_sRefid *rp;
pwr_tRefId rk = dp->sevid;
rp = (sev_sRefid *) tree_Find(&sts, sev->m_refid, &rk);
if ( !rp) {
dp = (sev_sHistData *)((char *)dp + sizeof( *dp) - sizeof(dp->data) + dp->size);
continue;
}
unsigned int idx = rp->idx;
time = net_NetTimeToTime( &msg->time);
sev->m_db->store_value( &sev->m_sts, th->db_ctx, idx, 0, time, &dp->data, dp->size);
sev->m_storage_cnt++;
dp = (sev_sHistData *)((char *)dp + sizeof( *dp) - sizeof(dp->data) + dp->size);
}
sev->m_db->commit_transaction( th->db_ctx);
th->alloc -= msg->size;
if ( th->conf_idx >= 0)
sev->m_config->ServerThreads[th->conf_idx].QueueAlloc = th->alloc;
free( msg);
}
}
if ( cnf_get_value( "sevReadThreads", str, sizeof(str))) {
if ( cdh_NoCaseStrcmp( str, "yes") == 0)
srv.m_read_threads = 1;
return (void *)1;
}
sev_sThread *sev_server::create_thread( int key)
{
int sts;
sev_sThread *th = (sev_sThread *)calloc( 1, sizeof(sev_sThread));
th->key = key;
que_Create( NULL, &th->queue);
th->db_ctx = m_db->new_thread();
if ( m_thread_cnt < sizeof(m_config->ServerThreads)/sizeof(m_config->ServerThreads[0]))
th->conf_idx = m_thread_cnt;
else
th->conf_idx = -1;
m_thread_cnt++;
m_thread_list[key] = th;
sev_sReceiveHistDataThread *arg = (sev_sReceiveHistDataThread *)malloc( sizeof(*arg));
arg->ctx = this;
arg->th = th;
sts = pthread_create( &th->thread, NULL, receive_histdata_thread, arg);
if ( sts != 0)
printf( "sev_server: pthread_create error %d\n", sts);
if ( th->conf_idx >= 0) {
m_config->ServerThreads[th->conf_idx].Key = key;
}
if ( dbtype == sev_eDbType_)
dbtype = sev_eDbType_Mysql;
return th;
}
void sev_server::delete_thread( int key)
{
sev_sThread *th;
srv.set_dbtype( dbtype);
th = m_thread_list[key];
if ( !th)
return;
free( th);
m_thread_list.erase(key);
}
int main (int argc, char *argv[])
{
sev_server srv;
int noneth = 0;
if ( argc > 1 && strcmp( argv[1], "-n") == 0)
noneth = 1;
srv.init( noneth);
srv.connect();
......
......@@ -43,6 +43,7 @@
#include "pwr.h"
#include "pwr_class.h"
#include "pwr_baseclasses.h"
#include "rt_que.h"
#include "rt_sev_net.h"
#include "sev_db.h"
......@@ -68,11 +69,35 @@ typedef struct {
unsigned int item_idx;
} sev_sHistDataThread;
typedef struct {
int key;
pthread_t thread;
que_sQue queue;
int alloc;
void *db_ctx;
int conf_idx;
} sev_sThread;
typedef struct {
void *ctx;
sev_sThread *th;
} sev_sReceiveHistDataThread;
typedef struct {
lst_sEntry e;
net_sTime time;
int size;
char data[1];
} sev_sReceiveHistDataMsg;
typedef map<int, sev_sThread *>::iterator threadlist_iterator;
class sev_server {
public:
sev_server() : m_server_status(0), m_refid(0), m_msg_id(0), m_storage_cnt(0),
m_db_type(sev_eDbType_Sqlite), m_config(0), m_read_threads(0) {memset(&m_stat,0,sizeof(m_stat));}
m_db_type(sev_eDbType_Sqlite), m_config(0), m_thread_cnt(0), m_read_threads(0)
{ memset(&m_stat,0,sizeof(m_stat));}
pwr_tStatus m_sts;
pwr_tStatus m_server_status;
......@@ -85,8 +110,10 @@ class sev_server {
sev_sStat m_stat;
sev_eDbType m_db_type;
pwr_sClass_SevServer *m_config;
unsigned int m_thread_cnt;
pwr_tDlid m_config_dlid;
int m_read_threads;
map<int, sev_sThread *>m_thread_list;
int init( int noneth);
int connect();
......@@ -103,6 +130,10 @@ class sev_server {
void garbage_collector();
void garbage_item( int idx);
void set_dbtype( sev_eDbType type) { m_db_type = type;}
sev_sThread *find_thread( int key);
static void *receive_histdata_thread( void *arg);
sev_sThread *create_thread( int key);
void delete_thread( int key);
static void *send_histdata_thread( void *arg);
};
......
......@@ -45,35 +45,35 @@
#include "rt_sev_msg.h"
sev_db *sev_db::open_database( sev_eDbType type)
sev_db *sev_db::open_database( sev_eDbType type, sev_sDbConfig *cnf)
{
sev_db *db = 0;
if ( type == sev_eDbType_Mysql) {
#if defined PWRE_CONF_MYSQL
return sev_dbms::open_database();
db = sev_dbms::open_database();
#else
printf( "** Release is not built with mysql\n");
return 0;
#endif
}
else if ( type == sev_eDbType_Sqlite) {
#if defined PWRE_CONF_SQLITE3
return sev_dbsqlite::open_database();
db = sev_dbsqlite::open_database();
#else
printf( "** Release is not built with sqlite3\n");
return 0;
#endif
}
else if ( type == sev_eDbType_HDF5) {
#if defined PWRE_CONF_HDF5
return sev_dbhdf5::open_database();
db = sev_dbhdf5::open_database();
#else
printf( "** Release is not built with HDF5\n");
return 0;
#endif
}
else
return 0;
if ( db)
db->m_cnf = *cnf;
return db;
}
int sev_db::get_systemname( char *name)
......
......@@ -57,6 +57,16 @@ typedef enum {
sev_eDbType_HDF5
} sev_eDbType;
typedef struct {
float LinearRegrMaxTime;
int LinearRegrAll;
int MeanValueAll;
float MeanValueInterval1;
float MeanValueInterval2;
pwr_tFileName Socket;
char Engine[80];
} sev_sDbConfig;
typedef struct {
float current_load;
float medium_load;
......@@ -184,6 +194,7 @@ class sev_db {
vector<sev_item> m_items;
float m_meanvalue_interval1;
float m_meanvalue_interval2;
sev_sDbConfig m_cnf;
sev_db() : m_meanvalue_interval1(0), m_meanvalue_interval2(0) {}
virtual ~sev_db() {}
......@@ -203,7 +214,7 @@ class sev_db {
pwr_tFloat32 deadband, pwr_tMask options, unsigned int *idx)
{ *sts = 0; return 0;}
virtual int delete_item( pwr_tStatus *sts, pwr_tOid oid, char *aname) { *sts = 0; return 0;}
virtual int store_value( pwr_tStatus *sts, int item_idx, int attr_idx,
virtual int store_value( pwr_tStatus *sts, void *thread, int item_idx, int attr_idx,
pwr_tTime time, void *buf, unsigned int size) { *sts = 0; return 0;}
virtual int get_values( pwr_tStatus *sts, pwr_tOid oid, pwr_tMask options, float deadband,
char *aname, pwr_eType type,
......@@ -243,11 +254,12 @@ class sev_db {
virtual int alter_engine( pwr_tStatus *sts, char *tablename) { *sts = 0; return 0;}
virtual int optimize( pwr_tStatus *sts, char *tablename) { *sts = 0; return 0;}
virtual int store_stat( sev_sStat *stat) { return 0;}
virtual int begin_transaction() { return 0;}
virtual int commit_transaction() { return 0;}
virtual int begin_transaction( void *thread) { return 0;}
virtual int commit_transaction( void *thread) { return 0;}
virtual char *dbName() { return 0;}
virtual void *new_thread() { return 0;}
static sev_db *open_database( sev_eDbType type);
static sev_db *open_database( sev_eDbType type, sev_sDbConfig *cnf);
static int get_systemname( char *name);
};
#endif
......@@ -226,9 +226,9 @@ pwr_tStatus sev_db::tree_update_value( int item_idx, pwr_tTime time, void *buf)
}
if ( m_items[item_idx].options & pwr_mSevOptionsMask_MeanValue1)
interval = m_meanvalue_interval1;
interval = m_cnf.MeanValueInterval1;
else if ( m_items[item_idx].options & pwr_mSevOptionsMask_MeanValue2)
interval = m_meanvalue_interval2;
interval = m_cnf.MeanValueInterval2;
if ( interval != 0) {
pwr_tDeltaTime dtime;
......
......@@ -2172,7 +2172,7 @@ int sev_dbhdf5::get_items( pwr_tStatus *sts)
return 1;
}
int sev_dbhdf5::store_value( pwr_tStatus *sts, int item_idx, int attr_idx,
int sev_dbhdf5::store_value( pwr_tStatus *sts, void *thread, int item_idx, int attr_idx,
pwr_tTime time, void *buf, unsigned int size)
{
sev_uDataType data;
......@@ -4872,4 +4872,4 @@ sev_dbhdf5::~sev_dbhdf5()
#else
extern int no_sev_dbhdf5;
int no_sev_dbhdf5 = 0;
#endif
\ No newline at end of file
#endif
......@@ -777,7 +777,7 @@ class sev_dbhdf5 : public sev_db {
pwr_tDeltaTime storagetime, pwr_eType type, unsigned int size,
char *description, char *unit, pwr_tFloat32 scantime,
pwr_tFloat32 deadband, pwr_tMask options, unsigned int *idx);
int store_value( pwr_tStatus *sts, int item_idx, int attr_idx,
int store_value( pwr_tStatus *sts, void *thread, int item_idx, int attr_idx,
pwr_tTime time, void *buf, unsigned int size);
int get_values( pwr_tStatus *sts, pwr_tOid oid, pwr_tMask options, float deadband, char *aname,
pwr_eType type, unsigned int size, pwr_tFloat32 scantime, pwr_tTime *creatime,
......@@ -861,4 +861,4 @@ class sev_dbhdf5 : public sev_db {
};
#endif
#endif
\ No newline at end of file
#endif
......@@ -185,6 +185,7 @@ int sev_dbms_env::close()
return 0;
}
#if 0
int sev_dbms_env::open(const char *v_host, const char *v_user, const char *v_passwd,
const char *v_dbName, unsigned int v_port, const char *v_socket)
{
......@@ -220,6 +221,7 @@ int sev_dbms_env::open(const char *v_host, const char *v_user, const char *v_pas
return 0;
}
#endif
int sev_dbms_env::create(const char *v_fileName, const char *v_host, const char *v_user,
const char *v_passwd, const char *v_dbName, unsigned int v_port,
......@@ -240,6 +242,12 @@ int sev_dbms_env::create(const char *v_fileName, const char *v_host, const char
MYSQL *sev_dbms_env::createDb(void)
{
if ( mysql_library_init( 0, NULL, NULL)) {
printf( "** Cannot init mysql client library\n");
return 0;
}
m_con = mysql_init(NULL);
MYSQL *con = mysql_real_connect(m_con, host(), user(), passwd(), 0, port(), socket(), 0);
......@@ -497,6 +505,11 @@ MYSQL *sev_dbms_env::openDb( unsigned int *sts)
{
*sts = 0;
if ( mysql_library_init( 0, NULL, NULL)) {
printf( "** Cannot init mysql client library\n");
return 0;
}
m_con = mysql_init(NULL);
MYSQL *con = mysql_real_connect(m_con, host(), user(), passwd(), dbName(), port(), socket(), 0);
......@@ -508,6 +521,19 @@ MYSQL *sev_dbms_env::openDb( unsigned int *sts)
return con;
}
MYSQL *sev_dbms_env::open_thread( unsigned int *sts)
{
MYSQL *con = mysql_init(NULL);
con = mysql_real_connect(m_con, host(), user(), passwd(), dbName(), port(), socket(), 0);
if (con == 0) {
*sts = mysql_errno(m_con);
return 0;
}
return con;
}
int sev_dbms_env::create()
{
struct stat sb;
......@@ -668,36 +694,8 @@ int sev_dbms_env::get_systemname()
return 1;
}
sev_dbms::sev_dbms( sev_dbms_env *env) : m_env(env), m_linearregr_maxtime(0),
m_linearregr_all(0), m_meanvalue1_all(0)
sev_dbms::sev_dbms( sev_dbms_env *env) : m_env(env)
{
char valuestr[20];
int nr;
float ftime;
if ( cnf_get_value( "sevLinearRegrMaxTime", valuestr, sizeof(valuestr))) {
nr = sscanf( valuestr, "%f", &ftime);
if ( nr == 1)
m_linearregr_maxtime = ftime;
}
if ( cnf_get_value( "sevLinearRegrAll", valuestr, sizeof(valuestr))) {
if ( cdh_NoCaseStrcmp( valuestr, "1") == 0)
m_linearregr_all = 1;
}
if ( cnf_get_value( "sevMeanValue1All", valuestr, sizeof(valuestr))) {
if ( cdh_NoCaseStrcmp( valuestr, "1") == 0)
m_meanvalue1_all = 1;
}
if ( cnf_get_value( "sevMeanValueInterval1", valuestr, sizeof(valuestr))) {
nr = sscanf( valuestr, "%f", &ftime);
if ( nr == 1)
m_meanvalue_interval1 = ftime;
}
if ( cnf_get_value( "sevMeanValueInterval2", valuestr, sizeof(valuestr))) {
nr = sscanf( valuestr, "%f", &ftime);
if ( nr == 1)
m_meanvalue_interval2 = ftime;
}
}
sev_db *sev_dbms::open_database()
......@@ -765,9 +763,9 @@ int sev_dbms::create_table( pwr_tStatus *sts, char *tablename, pwr_eType type,
char engine[80];
char enginestr[100] = "";
if ( cnf_get_value( "sevMysqlEngine", engine, sizeof(engine)) != 0)
if ( strcmp( m_cnf.Engine, "") != 0)
snprintf( enginestr, sizeof(enginestr), " engine=%s", engine);
if ( cdh_NoCaseStrcmp( engine, "innodb") == 0)
if ( cdh_NoCaseStrcmp( m_cnf.Engine, "innodb") == 0)
strcat( enginestr, " row_format=compressed");
if ( options & pwr_mSevOptionsMask_PosixTime) {
......@@ -845,7 +843,7 @@ int sev_dbms::create_event_table( pwr_tStatus *sts, char *tablename, pwr_tMask o
char engine[80];
char enginestr[100] = "";
if ( cnf_get_value( "sevMysqlEngine", engine, sizeof(engine)) != 0)
if ( strcmp( m_cnf.Engine, "") != 0)
snprintf( enginestr, sizeof(enginestr), " engine=%s", engine);
if ( options & pwr_mSevOptionsMask_PosixTime) {
......@@ -1028,9 +1026,9 @@ int sev_dbms::get_items( pwr_tStatus *sts)
item.options = strtoul(row[15], 0, 10);
item.attrnum = 1;
if ( m_linearregr_all && item.options & pwr_mSevOptionsMask_UseDeadBand)
if ( m_cnf.LinearRegrAll && item.options & pwr_mSevOptionsMask_UseDeadBand)
item.options |= pwr_mSevOptionsMask_DeadBandLinearRegr;
if ( m_meanvalue1_all)
if ( m_cnf.MeanValueAll)
item.options |= pwr_mSevOptionsMask_MeanValue1;
m_items.push_back( item);
......@@ -1047,9 +1045,16 @@ int sev_dbms::get_items( pwr_tStatus *sts)
return 1;
}
int sev_dbms::store_value( pwr_tStatus *sts, int item_idx, int attr_idx,
int sev_dbms::store_value( pwr_tStatus *sts, void *thread, int item_idx, int attr_idx,
pwr_tTime time, void *buf, unsigned int size)
{
MYSQL *con;
if ( thread)
con = (MYSQL *)thread;
else
con = m_env->con();
tree_update_value( item_idx, time, buf);
if ( m_items[item_idx].options & pwr_mSevOptionsMask_DeadBandLinearRegr) {
......@@ -1076,17 +1081,17 @@ int sev_dbms::store_value( pwr_tStatus *sts, int item_idx, int attr_idx,
default:
return 0;
}
m_items[item_idx].cache->add( value, &time);
m_items[item_idx].cache->evaluate( m_linearregr_maxtime);
m_items[item_idx].cache->add( value, &time, thread);
m_items[item_idx].cache->evaluate( m_cnf.LinearRegrMaxTime, thread);
return 1;
}
else
return write_value( sts, item_idx, attr_idx, time, buf, size);
return write_value( sts, item_idx, attr_idx, time, buf, size, thread);
}
int sev_dbms::write_value( pwr_tStatus *sts, int item_idx, int attr_idx,
pwr_tTime time, void *buf, unsigned int size)
pwr_tTime time, void *buf, unsigned int size, void *thread)
{
if(size != m_items[item_idx].value_size) {
//Something is seriously wrong
......@@ -1095,14 +1100,22 @@ int sev_dbms::write_value( pwr_tStatus *sts, int item_idx, int attr_idx,
*sts = SEV__DBERROR;
return 0;
}
if(m_items[item_idx].attrnum > 1) {
return store_objectvalue(sts, item_idx, attr_idx, time, buf, m_items[item_idx].old_value, size);
return store_objectvalue(sts, thread, item_idx, attr_idx, time, buf,
m_items[item_idx].old_value, size);
}
char query[400];
char bufstr[512];
char timstr[40];
int update_time_only = 0;
int set_jump = 0;
MYSQL *con;
if ( thread)
con = (MYSQL *)thread;
else
con = m_env->con();
if ( !m_items[item_idx].first_storage) {
if ( m_items[item_idx].options & pwr_mSevOptionsMask_UseDeadBand &&
......@@ -1324,7 +1337,7 @@ int sev_dbms::write_value( pwr_tStatus *sts, int item_idx, int attr_idx,
sprintf( query, "update %s set jump = 1 where id = %d",
m_items[item_idx].tablename,
m_items[item_idx].last_id);
int rc = mysql_query( m_env->con(), query);
int rc = mysql_query( con, query);
if (rc) {
printf("In %s row %d:\n", __FILE__, __LINE__);
printf( "Update jump: %s\n", mysql_error(m_env->con()));
......@@ -1433,7 +1446,7 @@ int sev_dbms::write_value( pwr_tStatus *sts, int item_idx, int attr_idx,
}
}
}
int rc = mysql_query( m_env->con(), query);
int rc = mysql_query( con, query);
if (rc) {
// printf( "Store value: %s \"%s\"\n", mysql_error(m_env->con()), query);
*sts = SEV__DBERROR;
......@@ -1478,7 +1491,7 @@ int sev_dbms::write_value( pwr_tStatus *sts, int item_idx, int attr_idx,
}
}
int rc = mysql_query( m_env->con(), query);
int rc = mysql_query( con, query);
if (rc) {
// printf( "Update value: %s\n", mysql_error(m_env->con()));
*sts = SEV__DBERROR;
......@@ -2514,13 +2527,13 @@ int sev_dbms::add_item( pwr_tStatus *sts, pwr_tOid oid, char *oname, char *aname
*idx = m_items.size() - 1;
if ( item.options & pwr_mSevOptionsMask_UseDeadBand &&
m_linearregr_all)
m_cnf.LinearRegrAll)
item.options |= pwr_mSevOptionsMask_DeadBandLinearRegr;
if ( item.options & pwr_mSevOptionsMask_DeadBandLinearRegr)
add_cache( *idx);
if ( m_meanvalue1_all)
if ( m_cnf.MeanValueAll)
item.options |= pwr_mSevOptionsMask_MeanValue1;
*sts = SEV__SUCCESS;
......@@ -2841,7 +2854,7 @@ int sev_dbms::create_objecttable( pwr_tStatus *sts, char *tablename, pwr_tMask o
char engine[80];
char enginestr[100] = "";
if ( cnf_get_value( "sevMysqlEngine", engine, sizeof(engine)) != 0)
if ( strcmp( m_cnf.Engine, "") != 0)
snprintf( enginestr, sizeof(enginestr), " engine=%s", engine);
if ( options & pwr_mSevOptionsMask_PosixTime) {
......@@ -3025,7 +3038,7 @@ pwr_tUInt64 sev_dbms::get_nextAutoIncrement( char *tablename )
}
int sev_dbms::store_objectvalue( pwr_tStatus *sts, int item_idx, int attr_idx,
int sev_dbms::store_objectvalue( pwr_tStatus *sts, void *thread, int item_idx, int attr_idx,
pwr_tTime time, void *buf, void *oldbuf, unsigned int size)
{
void *data = buf;
......@@ -3040,6 +3053,12 @@ int sev_dbms::store_objectvalue( pwr_tStatus *sts, int item_idx, int attr_idx,
char bufstr[512];
char bufInclEscCharstr[1025];
char timstr[40];
MYSQL *con;
if ( thread)
con = (MYSQL *)thread;
else
con = m_env->con();
*sts = time_AtoAscii( &time, time_eFormat_NumDateAndTime, timstr, sizeof(timstr));
if ( EVEN(*sts)) return 0;
......@@ -3091,7 +3110,7 @@ int sev_dbms::store_objectvalue( pwr_tStatus *sts, int item_idx, int attr_idx,
if ( EVEN(*sts)) return 0;
if(m_items[item_idx].attr[i].type == pwr_eType_String ||
m_items[item_idx].attr[i].type == pwr_eType_Text) {
mysql_real_escape_string(m_env->con(), bufInclEscCharstr, bufstr, strlen(bufstr) );
mysql_real_escape_string(con, bufInclEscCharstr, bufstr, strlen(bufstr) );
valuesStr.append("'");
valuesStr.append(bufInclEscCharstr);
valuesStr.append("',");
......@@ -3180,19 +3199,19 @@ int sev_dbms::store_objectvalue( pwr_tStatus *sts, int item_idx, int attr_idx,
sprintf( query, "update %s set sev__jump = 1 where sev__id = %d",
m_items[item_idx].tablename,
m_items[item_idx].last_id);
int rc = mysql_query( m_env->con(), query);
int rc = mysql_query( con, query);
if (rc) {
printf("In %s row %d:\n", __FILE__, __LINE__);
printf( "Update jump: %s\n", mysql_error(m_env->con()));
printf( "Update jump: %s\n", mysql_error(con));
}
}
//printf( "Store_objectvalue: %s\n", queryOStr.str().c_str());
int rc = mysql_query( m_env->con(), queryOStr.str().c_str());
int rc = mysql_query( con, queryOStr.str().c_str());
if (rc) {
printf("In %s row %d:\n", __FILE__, __LINE__);
printf( "%s: %s\n", __FUNCTION__, mysql_error(m_env->con()));
printf( "%s: %s\n", __FUNCTION__, mysql_error(con));
printf( "Error in: %s\n", queryOStr.str().c_str());
*sts = SEV__DBERROR;
......@@ -3200,14 +3219,14 @@ int sev_dbms::store_objectvalue( pwr_tStatus *sts, int item_idx, int attr_idx,
if ( m_items[item_idx].status != m_items[item_idx].logged_status) {
m_items[item_idx].logged_status = m_items[item_idx].status;
errh_Error( "Database update error: %s, table: %s object: %s",
mysql_error(m_env->con()), m_items[item_idx].tablename, m_items[item_idx].oname);
mysql_error(con), m_items[item_idx].tablename, m_items[item_idx].oname);
}
return 0;
}
if ( (m_items[item_idx].options & pwr_mSevOptionsMask_ReadOptimized) &&
!updateOnlyTime)
m_items[item_idx].last_id = mysql_insert_id( m_env->con());
m_items[item_idx].last_id = mysql_insert_id( con);
m_items[item_idx].first_storage = 0;
......@@ -4337,7 +4356,7 @@ int sev_dbms::alter_engine( pwr_tStatus *sts, char *tablename)
int rc;
char engine[80];
if ( cnf_get_value( "sevMysqlEngine", engine, sizeof(engine)) == 0) {
if ( strcmp( m_cnf.Engine, "") == 0) {
printf( "** No engine specified in /etc/proview.cnf\n");
return 0;
}
......@@ -4398,7 +4417,7 @@ int sev_dbms::store_stat( sev_sStat *stat)
return 1;
}
void sev_dbms::write_db_cb( void *data, int idx, void *value, pwr_tTime *time)
void sev_dbms::write_db_cb( void *data, int idx, void *value, pwr_tTime *time, void *thread)
{
pwr_tStatus sts;
sev_dbms *dbms = (sev_dbms *)data;
......@@ -4406,22 +4425,22 @@ void sev_dbms::write_db_cb( void *data, int idx, void *value, pwr_tTime *time)
switch( dbms->m_items[idx].attr[0].type) {
case pwr_eType_Float32: {
pwr_tFloat32 v = *(double *)value;
dbms->write_value( &sts, idx, 0, *time, &v, sizeof(v));
dbms->write_value( &sts, idx, 0, *time, &v, sizeof(v), thread);
break;
}
case pwr_eType_Float64: {
pwr_tFloat64 v = *(double *)value;
dbms->write_value( &sts, idx, 0, *time, &v, sizeof(v));
dbms->write_value( &sts, idx, 0, *time, &v, sizeof(v), thread);
break;
}
case pwr_eType_Int32: {
pwr_tInt32 v = *(double *)value;
dbms->write_value( &sts, idx, 0, *time, &v, sizeof(v));
dbms->write_value( &sts, idx, 0, *time, &v, sizeof(v), thread);
break;
}
case pwr_eType_Boolean: {
pwr_tBoolean v = *(pwr_tBoolean *)value;
dbms->write_value( &sts, idx, 0, *time, &v, sizeof(v));
dbms->write_value( &sts, idx, 0, *time, &v, sizeof(v), thread);
break;
}
default: ;
......@@ -4450,31 +4469,43 @@ void sev_dbms::add_cache( int item_idx)
}
}
int sev_dbms::begin_transaction()
int sev_dbms::begin_transaction( void *thread)
{
char query[20];
int rc;
MYSQL *con;
if ( thread)
con = (MYSQL *)thread;
else
con = m_env->con();
strcpy( query, "start transaction");
rc = mysql_query( m_env->con(), query);
rc = mysql_query( con, query);
if (rc) {
printf("In %s row %d:\n", __FILE__, __LINE__);
printf( "Begin transaction: %s\n", mysql_error(m_env->con()));
printf( "Begin transaction: %s\n", mysql_error(con));
return 0;
}
return 1;
}
int sev_dbms::commit_transaction()
int sev_dbms::commit_transaction( void *thread)
{
char query[20];
int rc;
MYSQL *con;
if ( thread)
con = (MYSQL *)thread;
else
con = m_env->con();
strcpy( query, "commit");
rc = mysql_query( m_env->con(), query);
rc = mysql_query( con, query);
if (rc) {
printf("In %s row %d:\n", __FILE__, __LINE__);
printf( "Begin transaction: %s\n", mysql_error(m_env->con()));
printf( "Begin transaction: %s\n", mysql_error(con));
return 0;
}
return 1;
......@@ -4510,6 +4541,11 @@ void sev_dbms::mysqlstring_to_string( char *in, char *out, int size)
*t = 0;
}
void *sev_dbms::new_thread()
{
unsigned int sts;
return m_env->open_thread( &sts);
}
sev_dbms::~sev_dbms()
{
......@@ -4517,7 +4553,7 @@ sev_dbms::~sev_dbms()
for(size_t idx = 0; idx < m_items.size(); idx++) {
if ( m_items[idx].cache)
// Write last value
m_items[idx].cache->write(0);
m_items[idx].cache->write(0, 0);
if( m_items[idx].old_value != 0 ) {
free(m_items[idx].old_value);
m_items[idx].old_value = 0;
......
......@@ -49,7 +49,6 @@ using namespace std;
class sev_dbms_env;
class sev_dbms_env
{
public:
......@@ -81,8 +80,10 @@ class sev_dbms_env
int open(void);
int open(const char *fileName);
#if 0
int open(const char *host, const char *user, const char *passwd,
const char *dbName, unsigned int port, const char *socket);
#endif
int checkAndUpdateVersion(unsigned int version);
int updateDBToSevVersion2(void);
......@@ -90,6 +91,7 @@ class sev_dbms_env
int createSevVersion3Tables(void);
MYSQL *createDb(void);
MYSQL *openDb(unsigned int *sts);
MYSQL *open_thread( unsigned int *sts);
bool exists() { return m_exists;}
int close(void);
static int get_systemname();
......@@ -125,9 +127,6 @@ class sev_dbms : public sev_db {
static const unsigned int constMaxColNameLength = 64;
sev_dbms_env *m_env;
double m_linearregr_maxtime;
int m_linearregr_all;
int m_meanvalue1_all;
sev_dbms( sev_dbms_env *env);
~sev_dbms();
......@@ -140,10 +139,10 @@ class sev_dbms : public sev_db {
pwr_tDeltaTime storagetime, pwr_eType type, unsigned int size,
char *description, char *unit, pwr_tFloat32 scantime,
pwr_tFloat32 deadband, pwr_tMask options, unsigned int *idx);
int store_value( pwr_tStatus *sts, int item_idx, int attr_idx,
int store_value( pwr_tStatus *sts, void *thread, int item_idx, int attr_idx,
pwr_tTime time, void *buf, unsigned int size);
int write_value( pwr_tStatus *sts, int item_idx, int attr_idx,
pwr_tTime time, void *buf, unsigned int size);
pwr_tTime time, void *buf, unsigned int size, void *thread);
int get_values( pwr_tStatus *sts, pwr_tOid oid, pwr_tMask options, float deadband, char *aname,
pwr_eType type, unsigned int size, pwr_tFloat32 scantime, pwr_tTime *creatime,
pwr_tTime *starttime,
......@@ -166,9 +165,9 @@ class sev_dbms : public sev_db {
char *dbName() { return sev_dbms_env::dbName();}
char *pwrtype_to_type( pwr_eType type, unsigned int size);
static int timestr_to_time( char *tstr, pwr_tTime *ts);
static void write_db_cb( void *data, int idx, void *value, pwr_tTime *time);
int check_objectitem( pwr_tStatus *sts, char *tablename, pwr_tOid oid, char *oname, char *aname,
pwr_tDeltaTime storagetime,
static void write_db_cb( void *data, int idx, void *value, pwr_tTime *time, void *thread);
int check_objectitem( pwr_tStatus *sts, char *tablename, pwr_tOid oid, char *oname, char *aname,
pwr_tDeltaTime storagetime,
char *description, pwr_tFloat32 scantime,
pwr_tFloat32 deadband, pwr_tMask options, unsigned int attrnum,
sev_sHistAttr *attr, unsigned int *idx);
......@@ -181,8 +180,8 @@ class sev_dbms : public sev_db {
pwr_tDeltaTime storagetime, char *description, pwr_tFloat32 scantime,
pwr_tFloat32 deadband, pwr_tMask options);
int create_objecttable( pwr_tStatus *sts, char *tablename, pwr_tMask options, float deadband);
int store_objectvalue( pwr_tStatus *sts, int item_idx, int attr_idx,
pwr_tTime time, void *buf, void *oldbuf, unsigned int size);
int store_objectvalue( pwr_tStatus *sts, void *thread, int item_idx, int attr_idx,
pwr_tTime time, void *buf, void *oldbuf, unsigned int size);
int get_item( pwr_tStatus *sts, sev_item *item, pwr_tOid oid, char *attributename);
int get_objectitem( pwr_tStatus *sts, sev_item *item, pwr_tOid oid, char *attributename);
int get_objectitems( pwr_tStatus *sts);
......@@ -207,8 +206,9 @@ class sev_dbms : public sev_db {
int optimize( pwr_tStatus *sts, char *tablename);
int store_stat( sev_sStat *stat);
void add_cache( int item_idx);
int begin_transaction();
int commit_transaction();
int begin_transaction( void *thread);
int commit_transaction( void *thread);
void *new_thread();
int get_closest_time( char *tablename, unsigned int options, pwr_tTime *time, int before,
unsigned int *id);
void string_to_mysqlstring( char *in, char *out, int size);
......
......@@ -595,7 +595,7 @@ int sev_dbsqlite::get_items( pwr_tStatus *sts)
return 1;
}
int sev_dbsqlite::store_value( pwr_tStatus *sts, int item_idx, int attr_idx,
int sev_dbsqlite::store_value( pwr_tStatus *sts, void *thread, int item_idx, int attr_idx,
pwr_tTime time, void *buf, unsigned int size)
{
if(size != m_items[item_idx].value_size) {
......@@ -3361,7 +3361,7 @@ int sev_dbsqlite::store_stat( sev_sStat *stat)
return 1;
}
int sev_dbsqlite::begin_transaction()
int sev_dbsqlite::begin_transaction( void *thread)
{
char query[20];
char *errmsg;
......@@ -3378,7 +3378,7 @@ int sev_dbsqlite::begin_transaction()
return 1;
}
int sev_dbsqlite::commit_transaction()
int sev_dbsqlite::commit_transaction( void *thread)
{
char query[20];
char *errmsg;
......@@ -3409,4 +3409,4 @@ sev_dbsqlite::~sev_dbsqlite()
#else
extern int no_sev_dbsqlite;
int no_sev_dbsqlite = 0;
#endif
\ No newline at end of file
#endif
......@@ -75,7 +75,7 @@ class sev_dbsqlite : public sev_db {
pwr_tDeltaTime storagetime, pwr_eType type, unsigned int size,
char *description, char *unit, pwr_tFloat32 scantime,
pwr_tFloat32 deadband, pwr_tMask options, unsigned int *idx);
int store_value( pwr_tStatus *sts, int item_idx, int attr_idx,
int store_value( pwr_tStatus *sts, void *thread, int item_idx, int attr_idx,
pwr_tTime time, void *buf, unsigned int size);
int get_values( pwr_tStatus *sts, pwr_tOid oid, pwr_tMask options, float deadband, char *aname,
pwr_eType type, unsigned int size, pwr_tFloat32 scantime, pwr_tTime *creatime,
......@@ -137,8 +137,8 @@ class sev_dbsqlite : public sev_db {
int repair_table( pwr_tStatus *sts, char *tablename);
int alter_engine( pwr_tStatus *sts, char *tablename);
int store_stat( sev_sStat *stat);
int begin_transaction();
int commit_transaction();
int begin_transaction( void *thread);
int commit_transaction( void *thread);
inline char* create_colName(unsigned int index, char *attributename) {
static char colName[constMaxColNameLength];
strncpy(colName, attributename, constMaxColNameLength);
......@@ -151,4 +151,4 @@ class sev_dbsqlite : public sev_db {
}
};
#endif
#endif
\ No newline at end of file
#endif
......@@ -64,7 +64,7 @@ sev_sCacheValueDouble& sev_valuecache_double::operator[]( const int index)
return m_val[idx(index)];
}
void sev_valuecache_double::add( void *value, pwr_tTime *t)
void sev_valuecache_double::add( void *value, pwr_tTime *t, void *thread)
{
double val = *(double *)value;
double time;
......@@ -96,7 +96,7 @@ void sev_valuecache_double::add( void *value, pwr_tTime *t)
}
}
if ( !m_inited) {
write( 0);
write( 0, thread);
m_inited = true;
return;
}
......@@ -110,7 +110,7 @@ void sev_valuecache_double::add( void *value, pwr_tTime *t)
calculate_epsilon(0);
}
void sev_valuecache_double::evaluate( double maxtime)
void sev_valuecache_double::evaluate( double maxtime, void *thread)
{
int value_added = 1;
......@@ -118,7 +118,7 @@ void sev_valuecache_double::evaluate( double maxtime)
if ( (maxtime != 0 && (m_val[m_last].time - m_wval.time) > maxtime) ||
!check_deadband()) {
// Store optimal value
write( m_last_opt_write + value_added);
write( m_last_opt_write + value_added, thread);
}
else
break;
......@@ -158,7 +158,7 @@ void sev_valuecache_double::calculate_k()
}
}
void sev_valuecache_double::write( int index)
void sev_valuecache_double::write( int index, void *thread)
{
int ii = idx(index);
double wval, wtime;
......@@ -196,7 +196,7 @@ void sev_valuecache_double::write( int index)
if ( m_write_cb) {
pwr_tTime time;
time_Aadd( &time, &m_start_time, time_Float64ToD( 0, wtime));
(m_write_cb)( m_userdata, m_useridx, &wval, &time);
(m_write_cb)( m_userdata, m_useridx, &wval, &time, thread);
}
}
......@@ -274,29 +274,29 @@ int sev_valuecache_double::get_optimal_write()
return min_idx;
}
void sev_valuecache_bool::add( void *value, pwr_tTime *t)
void sev_valuecache_bool::add( void *value, pwr_tTime *t, void *thread)
{
m_val.val = *(pwr_tBoolean *)value;
m_val.time = *t;
if ( !m_inited) {
// Store valeu
write( 0);
// Store value
write( 0, thread);
m_inited = true;
}
}
void sev_valuecache_bool::evaluate( double maxtime)
void sev_valuecache_bool::evaluate( double maxtime, void *thread)
{
if ( m_val.val != m_wval.val) {
write(0);
write(0, thread);
}
}
void sev_valuecache_bool::write( int index)
void sev_valuecache_bool::write( int index, void *thread)
{
m_wval.val = m_val.val;
m_wval.time = m_val.time;
if ( m_write_cb)
(m_write_cb)( m_userdata, m_useridx, &m_wval.val, &m_wval.time);
(m_write_cb)( m_userdata, m_useridx, &m_wval.val, &m_wval.time, thread);
}
......@@ -63,16 +63,16 @@ class sev_valuecache {
sev_eCvType m_type;
void *m_userdata;
int m_useridx;
void (*m_write_cb)( void *, int , void *, pwr_tTime *);
void (*m_write_cb)( void *, int , void *, pwr_tTime *, void *);
sev_valuecache( sev_eCvType type) : m_type(type), m_userdata(0), m_useridx(0), m_write_cb(0) {}
sev_valuecache( const sev_valuecache& x) : m_type(x.m_type), m_userdata(x.m_userdata), m_useridx(x.m_useridx),
m_write_cb(x.m_write_cb) {}
virtual ~sev_valuecache() {};
virtual void add( void *value, pwr_tTime *time) {};
virtual void evaluate( double maxtime) {};
virtual void write( int index) {};
virtual void set_write_cb( void (*write_cb)( void *, int, void *, pwr_tTime *), void *userdata, int idx) {
virtual void add( void *value, pwr_tTime *time, void *thread) {};
virtual void evaluate( double maxtime, void *thread) {};
virtual void write( int index, void *thread) {};
virtual void set_write_cb( void (*write_cb)( void *, int, void *, pwr_tTime *, void *), void *userdata, int idx) {
m_write_cb = write_cb;
m_userdata = userdata;
m_useridx = idx;
......@@ -117,10 +117,10 @@ class sev_valuecache_double : public sev_valuecache {
int idx( int index);
sev_sCacheValueDouble& operator[]( const int index);
sev_sCacheValueDouble& wval() { return m_wval;}
void add( void *value, pwr_tTime *time);
void evaluate( double maxtime);
void add( void *value, pwr_tTime *time, void *thread);
void evaluate( double maxtime, void *thread);
void calculate_k();
void write( int index);
void write( int index, void *thread);
void calculate_epsilon();
void calculate_epsilon( int index);
bool check_deadband( int index);
......@@ -146,9 +146,9 @@ class sev_valuecache_bool : public sev_valuecache {
}
~sev_valuecache_bool() {}
sev_sCacheValueBool& wval() { return m_wval;}
void add( void *value, pwr_tTime *time);
void evaluate( double maxtime);
void write( int index);
void add( void *value, pwr_tTime *time, void *thread);
void evaluate( double maxtime, void *thread);
void write( int index, void *thread);
};
......
......@@ -669,8 +669,10 @@ int rt_sevhistmon::send_data()
put.allocate = 0;
msg->Type = sev_eMsgType_HistDataStore;
msg->Version = sev_cNetVersion;
time_GetTime( &current_time);
msg->Time = net_TimeToNetTime( &current_time);
msg->ServerThread = m_hs[i].threadp->ServerThread;
dp = (sev_sHistData *) &msg->Data;
for ( unsigned int j = 0; j < m_hs[i].sevhistlist.size(); j++) {
......@@ -1503,4 +1505,4 @@ int main()
client.mainloop();
client.close();
}
\ No newline at end of file
}
......@@ -45,6 +45,10 @@
# include "rt_sync.h"
#endif
#if defined __cplusplus
extern "C" {
#endif
typedef struct lst_sEntry lst_sEntry;
struct lst_sEntry {
lst_sEntry *flink;
......@@ -67,4 +71,7 @@ void * lst_RemovePred (thread_sMutex*, lst_sEntry*, lst_sEntry**);
void * lst_RemoveSucc (thread_sMutex*, lst_sEntry*, lst_sEntry**);
void * lst_Succ (thread_sMutex*, lst_sEntry*, lst_sEntry**);
#endif
\ No newline at end of file
#if defined __cplusplus
}
#endif
#endif
......@@ -41,6 +41,10 @@
#include "rt_sync.h"
#include "rt_lst.h"
#if defined __cplusplus
extern "C" {
#endif
typedef struct {
thread_sMutex mutex;
thread_sCond cond;
......@@ -51,4 +55,7 @@ que_sQue * que_Create(pwr_tStatus*, que_sQue*);
void * que_Get(pwr_tStatus*, que_sQue*, pwr_tDeltaTime*, void*);
void que_Put(pwr_tStatus*, que_sQue*, lst_sEntry*, void*);
#endif
\ No newline at end of file
#if defined __cplusplus
}
#endif
#endif
......@@ -48,6 +48,7 @@ extern "C"
#define sev_eProcSevClient 121
#define sev_eProcSevServer 122
#define sev_cMsgClass 202
#define sev_cNetVersion 1
typedef enum {
sev_eMsgType_NodeUp,
......@@ -118,11 +119,13 @@ typedef struct {
// Message types
typedef struct {
sev_eMsgType Type;
pwr_tUInt16 Type;
pwr_tUInt16 Version;
} sev_sMsgAny;
typedef struct {
sev_eMsgType Type;
pwr_tUInt16 Type;
pwr_tUInt16 Version;
pwr_tStatus Status;
unsigned int NumItems;
unsigned int NumAttributes;
......@@ -130,20 +133,31 @@ typedef struct {
} sev_sMsgHistItems;
typedef struct {
sev_eMsgType Type;
net_sTime Time;
pwr_tUInt16 Type;
pwr_tUInt16 Version;
net_sTime Time;
pwr_tUInt32 ServerThread;
int Data[1];
} sev_sMsgHistDataStore;
typedef struct {
sev_eMsgType Type;
pwr_tUInt16 Type;
pwr_tUInt16 Version;
net_sTime Time;
int Data[1];
} sev_sMsgHistDataStoreV0;
typedef struct {
pwr_tUInt16 Type;
pwr_tUInt16 Version;
pwr_tOid Oid;
unsigned int NumEvents;
sev_sEvent Events[1];
} sev_sMsgEventsStore;
typedef struct {
sev_eMsgType Type;
pwr_tUInt16 Type;
pwr_tUInt16 Version;
pwr_tOid Oid;
pwr_tOName AName;
net_sTime StartTime;
......@@ -152,7 +166,8 @@ typedef struct {
} sev_sMsgHistDataGetRequest;
typedef struct {
sev_eMsgType Type;
pwr_tUInt16 Type;
pwr_tUInt16 Version;
pwr_tOid Oid;
pwr_tOName AName;
pwr_tStatus Status;
......@@ -163,32 +178,36 @@ typedef struct {
} sev_sMsgHistDataGet;
typedef struct {
sev_eMsgType Type;
pwr_tOid Oid;
pwr_tOName AName;
pwr_tStatus Status;
int NumPoints;
int NumAttributes;
unsigned int TotalDataSize;
sev_sHistAttr Attr[1];
int Data[1];
pwr_tUInt16 Type;
pwr_tUInt16 Version;
pwr_tOid Oid;
pwr_tOName AName;
pwr_tStatus Status;
int NumPoints;
int NumAttributes;
unsigned int TotalDataSize;
sev_sHistAttr Attr[1];
int Data[1];
} sev_sMsgHistObjectDataGet;
typedef struct {
sev_eMsgType Type;
pwr_tUInt16 Type;
pwr_tUInt16 Version;
pwr_tOid Oid;
pwr_tOName AName;
} sev_sMsgHistItemDelete;
typedef struct {
sev_eMsgType Type;
pwr_tUInt16 Type;
pwr_tUInt16 Version;
pwr_tOid Oid;
pwr_tOName AName;
pwr_tStatus Status;
} sev_sMsgHistItemStatus;
typedef struct {
sev_eMsgType Type;
pwr_tUInt16 Type;
pwr_tUInt16 Version;
pwr_tStatus Status;
} sev_sMsgServerStatus;
......@@ -196,4 +215,4 @@ typedef struct {
}
#endif
#endif
\ No newline at end of file
#endif
......@@ -175,6 +175,7 @@ int sevcli_get_itemlist( pwr_tStatus *sts, sevcli_tCtx ctx, sevcli_sHistItem **l
put.data = msg;
msg->Type = sev_eMsgType_HistItemsRequest;
msg->Version = sev_cNetVersion;
if ( !qcom_Put( sts, &tgt, &put)) {
qcom_Free( &lsts, put.data);
......@@ -272,6 +273,7 @@ int sevcli_get_itemlist( pwr_tStatus *sts, sevcli_tCtx ctx, sevcli_sHistItem **l
put.data = msg;
msg->Type = sev_eMsgType_HistItemsRequest;
msg->Version = sev_cNetVersion;
if ( !qcom_Put( sts, &tgt, &put)) {
qcom_Free( &lsts, put.data);
......@@ -381,6 +383,7 @@ int sevcli_get_itemdata( pwr_tStatus *sts, sevcli_tCtx ctx, pwr_tOid oid,
put.data = msg;
msg->Type = sev_eMsgType_HistDataGetRequest;
msg->Version = sev_cNetVersion;
msg->Oid = oid;
strncpy( msg->AName, aname, sizeof(msg->AName));
msg->StartTime = net_TimeToNetTime( &starttime);
......@@ -486,6 +489,7 @@ int sevcli_get_objectitemdata( pwr_tStatus *sts, sevcli_tCtx ctx, pwr_tOid oid,
put.data = msg;
msg->Type = sev_eMsgType_HistObjectDataGetRequest;
msg->Version = sev_cNetVersion;
msg->Oid = oid;
strncpy( msg->AName, aname, sizeof(msg->AName));
msg->StartTime = net_TimeToNetTime( &starttime);
......@@ -600,6 +604,7 @@ int sevcli_delete_item( pwr_tStatus *sts, sevcli_tCtx ctx, pwr_tOid oid, char *a
put.data = msg;
msg->Type = sev_eMsgType_HistItemDelete;
msg->Version = sev_cNetVersion;
msg->Oid = oid;
strncpy( msg->AName, aname, sizeof(msg->AName));
......
......@@ -40,10 +40,17 @@
#include "pwr.h"
#include "rt_thread.h"
#if defined __cplusplus
extern "C" {
#endif
pwr_tStatus sync_CondInit (thread_sCond*);
pwr_tStatus sync_MutexInit (thread_sMutex*);
pwr_tStatus sync_CondSignal (thread_sCond*);
pwr_tStatus sync_MutexLock (thread_sMutex*);
pwr_tStatus sync_MutexUnlock (thread_sMutex*);
#endif
\ No newline at end of file
#if defined __cplusplus
}
#endif
#endif
......@@ -42,6 +42,10 @@
#if defined OS_POSIX
#include <pthread.h>
#if defined __cplusplus
extern "C" {
#endif
typedef pthread_t thread_s;
typedef struct {
......@@ -76,4 +80,7 @@ pwr_tStatus thread_Join (thread_s*, pwr_tStatus*);
pwr_tStatus thread_Signal (thread_s*, int);
pwr_tBoolean thread_SigTimedWait (thread_s*, int, pwr_tDeltaTime*);
#endif
\ No newline at end of file
#if defined __cplusplus
}
#endif
#endif
!
! Proview Open Source Process Control.
! Copyright (C) 2005-2017 SSAB EMEA AB.
!
! This file is part of Proview.
!
! This program is free software; you can redistribute it and/or
! modify it under the terms of the GNU General Public License as
! published by the Free Software Foundation, either version 2 of
! the License, or (at your option) any later version.
!
! This program is distributed in the hope that it will be useful
! but WITHOUT ANY WARRANTY; without even the implied warranty of
! MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
! GNU General Public License for more details.
!
! You should have received a copy of the GNU General Public License
! along with Proview. If not, see <http://www.gnu.org/licenses/>
!
! Linking Proview statically or dynamically with other modules is
! making a combined work based on Proview. Thus, the terms and
! conditions of the GNU General Public License cover the whole
! combination.
!
! In addition, as a special exception, the copyright holders of
! Proview give you permission to, from the build function in the
! Proview Configurator, combine Proview with modules generated by the
! Proview PLC Editor to a PLC program, regardless of the license
! terms of these modules. You may copy and distribute the resulting
! combined work under the terms of your choice, provided that every
! copy of the combined work is accompanied by a complete copy of
! the source code of Proview (the version used to produce the
! combined work), being distributed under the terms of the GNU
! General Public License plus this exception.
!
! pwrb_c_a_sevserverthread.wb_load -- Defines the class SevServerThread.
!
SObject pwrb:Class
!/**
! @Version 1.0
! Displays sev server thread info.
!
! @b See also
! @classlink SevServer pwrb_sevserver.html
!*/
Object SevServerThread $ClassDef 699
Body SysBody
Attr Editor = pwr_eEditor_AttrEd
Attr Method = pwr_eMethod_Standard
Attr Flags |= pwr_mClassDef_Internal
EndBody
Object RtBody $ObjBodyDef 1
Body SysBody
Attr StructName = "SevServerThread"
EndBody
!/**
! Thread key.
!*/
Object Key $Attribute 1
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_STATE
EndBody
EndObject
!/**
! Allocated size in thread queue.
!*/
Object QueueAlloc $Attribute 2
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_STATE
EndBody
EndObject
!/**
! Lost messages.
!*/
Object LostCnt $Attribute 3
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_STATE
EndBody
EndObject
!/**
! Medium load in percentage.
!*/
Object MediumLoad $Attribute 4
Body SysBody
Attr TypeRef = "pwrs:Type-$Float32"
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_STATE
EndBody
EndObject
!/**
! Storage rate. Values per second.
!*/
Object StorageRate $Attribute 5
Body SysBody
Attr TypeRef = "pwrs:Type-$Float32"
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_STATE
EndBody
EndObject
!/**
! Medium storage rate. Values per second.
!*/
Object MediumStorageRate $Attribute 6
Body SysBody
Attr TypeRef = "pwrs:Type-$Float32"
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_STATE
EndBody
EndObject
!/**
! Data store message count.
!*/
Object DataStoreMsgCnt $Attribute 7
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_STATE
EndBody
EndObject
!/**
! Event store message count.
!*/
Object EventStoreMsgCnt $Attribute 8
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
Attr Flags |= PWR_MASK_NOEDIT
Attr Flags |= PWR_MASK_STATE
EndBody
EndObject
EndObject
EndObject
EndSObject
\ No newline at end of file
......@@ -80,6 +80,16 @@ SObject pwrb:Class
EndBody
EndObject
!/**
! Thread in server node to handle the storage.
! An integer value. Threads with equal ServerThread will be
! handled by the same server thread.
!*/
Object ServerThread $Attribute 9
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
EndBody
EndObject
!/**
! Specifies, in seconds, the time between two samples.
! The minimum allowed value is 1 seconds.
!*/
......
......@@ -82,14 +82,93 @@ SObject pwrb:Class
EndBody
EndObject
!/**
! UseThreads.
!*/
Object UseServerThreads $Attribute 3
Body SysBody
Attr TypeRef = "pwrs:Type-$Boolean"
EndBody
EndObject
!/**
! Calculate mean value on all items.
!*/
Object MeanValueAll $Attribute 4
Body SysBody
Attr TypeRef = "pwrs:Type-$Boolean"
EndBody
EndObject
!/**
! Use linear regression on all items with deadband.
!*/
Object LinearRegrAll $Attribute 5
Body SysBody
Attr TypeRef = "pwrs:Type-$Boolean"
EndBody
EndObject
!/**
! Max time without storage for items with deadband linear regression.
! If zero, time is infinite.
!*/
Object LinearRegrMaxTime $Attribute 6
Body SysBody
Attr TypeRef = "pwrs:Type-$Float32"
EndBody
EndObject
!/**
! Interval for calcuation of mean value for items
! with the MeanValue1 bit set in options.
!*/
Object MeanValueInterval1 $Attribute 7
Body SysBody
Attr TypeRef = "pwrs:Type-$Float32"
EndBody
EndObject
!/**
! Interval for calcuation of mean value for items
! with the MeanValue2 bit set in options.
!*/
Object MeanValueInterval2 $Attribute 8
Body SysBody
Attr TypeRef = "pwrs:Type-$Float32"
EndBody
EndObject
!/**
! Max limit for thread queue size.
! If the queue limit is exceeded, messages are lost.
!*/
Object ThreadQueueLimit $Attribute 9
Body SysBody
Attr TypeRef = "pwrs:Type-$UInt32"
EndBody
EndObject
!/**
! Statistics.
!*/
Object Stat $Attribute 3
Object Stat $Attribute 10
Body SysBody
Attr TypeRef = "pwrb:Class-SevStatistics"
Attr Flags |= PWR_MASK_CLASS
EndBody
EndObject
!/**
! Server threads.
!*/
Object ServerThreads $Attribute 11
Body SysBody
Attr TypeRef = "pwrb:Class-SevServerThread"
Attr Flags |= PWR_MASK_CLASS
Attr Flags |= PWR_MASK_ARRAY
Attr Elements = 20
EndBody
EndObject
EndObject
Object Template SevServer
Body RtBody
Attr UseServerThreads = 1
Attr MeanValueInterval1 = 10
Attr MeanValueInterval2 = 30
Attr ThreadQueueLimit = 600000
EndBody
EndObject
EndObject
EndSObject
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment