Commit ccef4c7b authored by Claes Sjofors's avatar Claes Sjofors

sev_server get_values, start and end points added and memory leakage fix

parent ec6c4107
...@@ -574,6 +574,13 @@ int sev_server::mainloop() ...@@ -574,6 +574,13 @@ int sev_server::mainloop()
garbage_collector(0); garbage_collector(0);
time_Aadd(&next_garco, &next_garco, &garco_interval); time_Aadd(&next_garco, &next_garco, &garco_interval);
} }
if (sts == QCOM__ALLOCQUOTA) {
struct timespec r, t;
t.tv_sec = 0;
t.tv_nsec = tmo * 1000000;
nanosleep(&t, &r);
continue;
}
if (sts == QCOM__TMO || !mp) if (sts == QCOM__TMO || !mp)
continue; continue;
...@@ -965,6 +972,7 @@ int sev_server::send_histdata( ...@@ -965,6 +972,7 @@ int sev_server::send_histdata(
qcom_sQid tgt, sev_sMsgHistDataGetRequest* rmsg, unsigned int size) qcom_sQid tgt, sev_sMsgHistDataGetRequest* rmsg, unsigned int size)
{ {
pthread_t thread; pthread_t thread;
pthread_attr_t attr;
int sts; int sts;
sev_sHistDataThread* arg = (sev_sHistDataThread*)malloc(sizeof(*arg)); sev_sHistDataThread* arg = (sev_sHistDataThread*)malloc(sizeof(*arg));
...@@ -974,8 +982,11 @@ int sev_server::send_histdata( ...@@ -974,8 +982,11 @@ int sev_server::send_histdata(
arg->size = size; arg->size = size;
if (m_read_threads) { if (m_read_threads) {
printf("New read thread\n"); static int pcnt = 0;
sts = pthread_create(&thread, NULL, send_histdata_thread, arg); printf("New read thread %d\n", pcnt++);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
sts = pthread_create(&thread, &attr, send_histdata_thread, arg);
if (sts != 0) if (sts != 0)
printf("pthread_create error %d\n", sts); printf("pthread_create error %d\n", sts);
} else { } else {
...@@ -1062,11 +1073,13 @@ void* sev_server::send_histdata_thread(void* arg) ...@@ -1062,11 +1073,13 @@ void* sev_server::send_histdata_thread(void* arg)
if (!qcom_Put(&sts, &tgt, &put)) { if (!qcom_Put(&sts, &tgt, &put)) {
qcom_Free(&sts, put.data); qcom_Free(&sts, put.data);
} }
free(tbuf);
free(vbuf);
if (sev->m_read_threads) if (sev->m_read_threads)
sev->m_db->delete_thread(thread); sev->m_db->delete_thread(thread);
// pthread_exit( (void *) 1); pthread_exit( (void *) 1);
return (void*)1; return (void*)1;
} }
......
...@@ -639,6 +639,7 @@ MYSQL* sev_dbms_env::open_thread(unsigned int* sts) ...@@ -639,6 +639,7 @@ MYSQL* sev_dbms_env::open_thread(unsigned int* sts)
void sev_dbms_env::close_thread(MYSQL* con) void sev_dbms_env::close_thread(MYSQL* con)
{ {
mysql_close(con); mysql_close(con);
mysql_thread_end();
} }
bool sev_dbms_env::exists() bool sev_dbms_env::exists()
...@@ -1956,6 +1957,10 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid, ...@@ -1956,6 +1957,10 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid,
char where_part[200]; char where_part[200];
int rows = 0; int rows = 0;
MYSQL* con; MYSQL* con;
char startval[8];
char endval[8];
char *startvalue = 0;
char *endvalue = 0;
if (thread) if (thread)
con = (MYSQL*)thread; con = (MYSQL*)thread;
...@@ -2009,16 +2014,39 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid, ...@@ -2009,16 +2014,39 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid,
unsigned int endid; unsigned int endid;
if (starttime) { if (starttime) {
if (type == pwr_eType_Boolean) {
// Get id for starttime // Get id for starttime
*sts = get_closest_time( *sts = get_closest_time(
thread, item.tablename, item.options, starttime, 1, &startid); thread, item.tablename, item.options, starttime, 1, &startid);
if (*sts == SEV__NOROWS) if (*sts == SEV__NOROWS)
get_id_range(sts, thread, &item, item.options, &startid, 0); get_id_range(sts, thread, &item, item.options, &startid, 0);
get_id_value(thread, item.tablename, startid, type, size,
startval);
startvalue = startval;
} else {
// Get id for starttime
*sts = get_closest_time(
thread, item.tablename, item.options, starttime, 1, &startid);
if (*sts == SEV__NOROWS)
get_id_range(sts, thread, &item, item.options, &startid, 0);
}
} else { } else {
get_id_range(sts, thread, &item, item.options, &startid, 0); get_id_range(sts, thread, &item, item.options, &startid, 0);
// startid = 0; // startid = 0;
} }
if (endtime) { if (endtime) {
if (type == pwr_eType_Boolean) {
// Get id for starttime
*sts = get_closest_time(
thread, item.tablename, item.options, endtime, 1, &endid);
if (*sts == SEV__NOROWS)
get_id_range(sts, thread, &item, item.options, 0, &endid);
if (endid == 0)
endid = strtoul(row[4], 0, 10);
get_id_value(thread, item.tablename, endid, type, size, endval);
endvalue = endval;
} else {
// Get id for starttime // Get id for starttime
*sts = get_closest_time( *sts = get_closest_time(
thread, item.tablename, item.options, endtime, 0, &endid); thread, item.tablename, item.options, endtime, 0, &endid);
...@@ -2026,6 +2054,7 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid, ...@@ -2026,6 +2054,7 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid,
get_id_range(sts, thread, &item, item.options, 0, &endid); get_id_range(sts, thread, &item, item.options, 0, &endid);
if (endid == 0) if (endid == 0)
endid = strtoul(row[4], 0, 10); endid = strtoul(row[4], 0, 10);
}
} else } else
endid = strtoul(row[4], 0, 10); endid = strtoul(row[4], 0, 10);
...@@ -2312,13 +2341,20 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid, ...@@ -2312,13 +2341,20 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid,
} else } else
break; break;
} }
int bufrows = rows; int bufrows = rows + (startvalue != 0) + (endvalue != 0);
if (options & pwr_mSevOptionsMask_ReadOptimized) { if (options & pwr_mSevOptionsMask_ReadOptimized) {
*tbuf = (pwr_tTime*)calloc(bufrows, sizeof(pwr_tTime)); *tbuf = (pwr_tTime*)calloc(bufrows, sizeof(pwr_tTime));
*vbuf = calloc(bufrows, size); *vbuf = calloc(bufrows, size);
int bcnt = 0; int bcnt = 0;
if (startvalue) {
(*tbuf)[bcnt] = *starttime;
memcpy((*vbuf), startvalue, size);
bcnt++;
}
for (int i = 0; i < rows; i++) { for (int i = 0; i < rows; i++) {
int j = 0; int j = 0;
...@@ -2384,6 +2420,13 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid, ...@@ -2384,6 +2420,13 @@ int sev_dbms::get_values(pwr_tStatus* sts, void* thread, pwr_tOid oid,
// else // else
// printf( "%5d %5d %s %s\n", i, bcnt, row[0], row[1]); // printf( "%5d %5d %s %s\n", i, bcnt, row[0], row[1]);
} }
if (endvalue) {
(*tbuf)[bcnt] = *endtime;
memcpy(((char*)*vbuf) + bcnt * size, endvalue, size);
bcnt++;
}
printf("bcnt %d bufrows %d\n", bcnt, bufrows); printf("bcnt %d bufrows %d\n", bcnt, bufrows);
*bsize = bcnt; *bsize = bcnt;
mysql_free_result(result); mysql_free_result(result);
...@@ -3999,6 +4042,48 @@ int sev_dbms::get_closest_time(void* thread, char* tablename, ...@@ -3999,6 +4042,48 @@ int sev_dbms::get_closest_time(void* thread, char* tablename,
return 1; return 1;
} }
int sev_dbms::get_id_value(void* thread, char* tablename,
unsigned int id, pwr_eType type, int size,
void *value)
{
char query[200];
MYSQL* con;
if (thread)
con = (MYSQL*)thread;
else
con = m_env->con();
sprintf(query, "select value from %s where id = %d",
tablename, id);
int rc = mysql_query(con, query);
if (rc) {
printf("In %s row %d:\n", __FILE__, __LINE__);
printf("%s Query Error\n", __FUNCTION__);
return SEV__DBERROR;
}
MYSQL_ROW row;
MYSQL_RES* result = mysql_store_result(con);
if (!result) {
printf("In %s row %d:\n", __FILE__, __LINE__);
printf("%s Status Result Error\n", __FUNCTION__);
return SEV__DBERROR;
}
row = mysql_fetch_row(result);
if (!row) {
mysql_free_result(result);
return SEV__NOROWS;
} else
cdh_StringToAttrValue(type, row[0], value);
mysql_free_result(result);
return 1;
}
int sev_dbms::get_objectvalues(pwr_tStatus* sts, void* thread, sev_item* item, int sev_dbms::get_objectvalues(pwr_tStatus* sts, void* thread, sev_item* item,
unsigned int size, pwr_tTime* starttime, pwr_tTime* endtime, int maxsize, unsigned int size, pwr_tTime* starttime, pwr_tTime* endtime, int maxsize,
pwr_tTime** tbuf, void** vbuf, unsigned int* bsize) pwr_tTime** tbuf, void** vbuf, unsigned int* bsize)
......
...@@ -216,6 +216,9 @@ public: ...@@ -216,6 +216,9 @@ public:
void delete_thread(void* thread); void delete_thread(void* thread);
int get_closest_time(void* thread, char* tablename, unsigned int options, int get_closest_time(void* thread, char* tablename, unsigned int options,
pwr_tTime* time, int before, unsigned int* id); pwr_tTime* time, int before, unsigned int* id);
int get_id_value(void* thread, char* tablename,
unsigned int id, pwr_eType type, int size,
void *value);
void string_to_mysqlstring(char* in, char* out, int size); void string_to_mysqlstring(char* in, char* out, int size);
void mysqlstring_to_string(char* in, char* out, int size); void mysqlstring_to_string(char* in, char* out, int size);
int get_id_range(pwr_tStatus* sts, void* thread, sev_item* item, int get_id_range(pwr_tStatus* sts, void* thread, sev_item* item,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment