Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
erp5
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Labels
Merge Requests
137
Merge Requests
137
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Jobs
Commits
Open sidebar
nexedi
erp5
Commits
433d07fd
Commit
433d07fd
authored
1 year ago
by
Jérome Perrin
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
CMFActivity py3
parent
03f2cbfa
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
24 additions
and
23 deletions
+24
-23
product/CMFActivity/Activity/SQLBase.py
product/CMFActivity/Activity/SQLBase.py
+12
-12
product/CMFActivity/Activity/SQLDict.py
product/CMFActivity/Activity/SQLDict.py
+2
-2
product/CMFActivity/tests/testCMFActivity.py
product/CMFActivity/tests/testCMFActivity.py
+10
-9
No files found.
product/CMFActivity/Activity/SQLBase.py
View file @
433d07fd
...
...
@@ -140,10 +140,10 @@ def sqltest_dict():
if
value
is
None
:
# XXX: see comment in SQLBase._getMessageList
return
column
+
b" IS NULL"
for
x
in
value
:
return
b"%s IN (%s)"
%
(
column
,
str2bytes
(
', '
.
join
(
map
(
return
str2bytes
(
"%s IN (%s)"
%
(
column
,
', '
.
join
(
map
(
str
if
isinstance
(
x
,
_SQLTEST_NO_QUOTE_TYPE_SET
)
else
render_datetime
if
isinstance
(
x
,
DateTime
)
else
render_string
,
value
))))
lambda
v
:
bytes2str
(
render_string
(
v
))
,
value
))))
return
b"0"
sqltest_dict
[
name
]
=
render
_
(
'active_process_uid'
)
...
...
@@ -245,7 +245,7 @@ def getNow(db):
Note that this value is not cached, and is not transactionnal on MySQL
side.
"""
return
db
.
query
(
"SELECT UTC_TIMESTAMP(6)"
,
0
)[
1
][
0
][
0
]
return
db
.
query
(
b
"SELECT UTC_TIMESTAMP(6)"
,
0
)[
1
][
0
][
0
]
class
SQLBase
(
Queue
):
"""
...
...
@@ -283,7 +283,7 @@ CREATE TABLE %s (
db
=
activity_tool
.
getSQLConnection
()
create
=
self
.
createTableSQL
()
if
clear
:
db
.
query
(
"DROP TABLE IF EXISTS "
+
self
.
sql_table
)
db
.
query
(
str2bytes
(
"DROP TABLE IF EXISTS "
+
self
.
sql_table
)
)
db
.
query
(
create
)
else
:
src
=
db
.
upgradeSchema
(
create
,
create_if_not_exists
=
1
,
...
...
@@ -832,8 +832,8 @@ CREATE TABLE %s (
"""
Put messages back in given processing_node.
"""
db
.
query
(
"UPDATE %s SET processing_node=%s WHERE uid IN (%s)
\
0
COMMIT"
%
(
self
.
sql_table
,
state
,
','
.
join
(
map
(
str
,
uid_list
))))
db
.
query
(
(
"UPDATE %s SET processing_node=%s WHERE uid IN (%s)
\
0
COMMIT"
%
(
self
.
sql_table
,
state
,
','
.
join
(
map
(
str
,
uid_list
))))
.
encode
())
def
getProcessableMessageLoader
(
self
,
db
,
processing_node
):
# do not merge anything
...
...
@@ -1040,16 +1040,16 @@ CREATE TABLE %s (
return
not
message_list
def
deleteMessageList
(
self
,
db
,
uid_list
):
db
.
query
(
"DELETE FROM %s WHERE uid IN (%s)"
%
(
self
.
sql_table
,
','
.
join
(
map
(
str
,
uid_list
))))
db
.
query
(
str2bytes
(
"DELETE FROM %s WHERE uid IN (%s)"
%
(
self
.
sql_table
,
','
.
join
(
map
(
str
,
uid_list
))))
)
def
reactivateMessageList
(
self
,
db
,
uid_list
,
delay
,
retry
):
db
.
query
(
"UPDATE %s SET"
db
.
query
(
str2bytes
(
"UPDATE %s SET"
" date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)"
"%s WHERE uid IN (%s)"
%
(
self
.
sql_table
,
delay
,
", retry = retry + 1"
if
retry
else
""
,
","
.
join
(
map
(
str
,
uid_list
))))
","
.
join
(
map
(
str
,
uid_list
))))
)
def
finalizeMessageExecution
(
self
,
activity_tool
,
message_list
,
uid_to_duplicate_uid_list_dict
=
None
):
...
...
@@ -1206,8 +1206,8 @@ CREATE TABLE %s (
To simulate time shift, we simply substract delay from
all dates in message(_queue) table
"""
activity_tool
.
getSQLConnection
().
query
(
"UPDATE %s SET"
activity_tool
.
getSQLConnection
().
query
(
(
"UPDATE %s SET"
" date = DATE_SUB(date, INTERVAL %s SECOND)"
%
(
self
.
sql_table
,
delay
)
+
(
''
if
processing_node
is
None
else
"WHERE processing_node=%s"
%
processing_node
))
"WHERE processing_node=%s"
%
processing_node
))
.
encode
())
This diff is collapsed.
Click to expand it.
product/CMFActivity/Activity/SQLDict.py
View file @
433d07fd
...
...
@@ -142,10 +142,10 @@ class SQLDict(SQLBase):
if reserve_uid_list:
self.assignMessageList(db, processing_node, reserve_uid_list)
else:
db.query("COMMIT") # XXX: useful ?
db.query(
b
"COMMIT") # XXX: useful ?
except:
self._log(WARNING, '
Failed
to
reserve
duplicates
')
db.query("ROLLBACK")
db.query(
b
"ROLLBACK")
raise
if uid_list:
self._log(TRACE, '
Reserved
duplicate
messages
:
%
r' % uid_list)
...
...
This diff is collapsed.
Click to expand it.
product/CMFActivity/tests/testCMFActivity.py
View file @
433d07fd
...
...
@@ -619,7 +619,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Monkey patch Queue to induce conflict errors artificially.
def
query
(
self
,
query_string
,
*
args
,
**
kw
):
# Not so nice, this is specific to zsql method
if
"REPLACE INTO"
in
query_string
:
if
b
"REPLACE INTO"
in
query_string
:
raise
OperationalError
return
self
.
original_query
(
query_string
,
*
args
,
**
kw
)
...
...
@@ -1026,7 +1026,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
"""
activity_tool
=
self
.
getActivityTool
()
def
delete_volatiles
():
for
property_id
in
activity_tool
.
__dict__
.
keys
(
):
for
property_id
in
list
(
six
.
iterkeys
(
activity_tool
.
__dict__
)
):
if
property_id
.
startswith
(
'_v_'
):
delattr
(
activity_tool
,
property_id
)
organisation_module
=
self
.
getOrganisationModule
()
...
...
@@ -1142,6 +1142,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self
.
flushAllActivities
(
silent
=
1
,
loop_size
=
100
)
# Check there is a traceback in the email notification
sender
,
recipients
,
mail
=
message_list
.
pop
()
mail
=
mail
.
decode
()
self
.
assertIn
(
"Module %s, line %s, in failingMethod"
%
(
__name__
,
inspect
.
getsourcelines
(
failingMethod
)[
1
]),
mail
)
self
.
assertIn
(
"ValueError:"
,
mail
)
...
...
@@ -1237,7 +1238,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Check that cmf_activity SQL connection still works
connection_da
=
self
.
portal
.
cmf_activity_sql_connection
()
self
.
assertFalse
(
connection_da
.
_registered
)
connection_da
.
query
(
'select 1'
)
connection_da
.
query
(
b
'select 1'
)
self
.
assertTrue
(
connection_da
.
_registered
)
self
.
commit
()
self
.
assertFalse
(
connection_da
.
_registered
)
...
...
@@ -1693,7 +1694,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# This is a one-shot method, revert after execution
SQLDict
.
dequeueMessage
=
original_dequeue
result
=
self
.
dequeueMessage
(
activity_tool
,
processing_node
,
node_family_id_set
)
queue_tic_test_dict
[
'is
Alive'
]
=
process_shutdown_thread
.
isA
live
()
queue_tic_test_dict
[
'is
_alive'
]
=
process_shutdown_thread
.
is_a
live
()
return
result
SQLDict
.
dequeueMessage
=
dequeueMessage
Organisation
.
waitingActivity
=
waitingActivity
...
...
@@ -1717,7 +1718,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool
.
tic
()
activity_thread
=
ActivityThread
()
# Do not try to outlive main thread.
activity_thread
.
setDaemon
(
True
)
activity_thread
.
daemon
=
True
# Call process_shutdown in yet another thread because it will wait for
# running activity to complete before returning, and we need to unlock
# activity *after* calling process_shutdown to make sure the next
...
...
@@ -1727,7 +1728,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool
.
process_shutdown
(
3
,
0
)
process_shutdown_thread
=
ProcessShutdownThread
()
# Do not try to outlive main thread.
process_shutdown_thread
.
setDaemon
(
True
)
process_shutdown_thread
.
daemon
=
True
activity_thread
.
start
()
# Wait at rendez-vous for activity to arrive.
...
...
@@ -1746,7 +1747,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self
.
assertEqual
(
len
(
message_list
),
1
)
self
.
assertEqual
(
message_list
[
0
].
method_id
,
'getTitle'
)
# Check that process_shutdown_thread was still runing when Queue_tic returned.
self
.
assertTrue
(
queue_tic_test_dict
.
get
(
'is
A
live'
),
repr
(
queue_tic_test_dict
))
self
.
assertTrue
(
queue_tic_test_dict
.
get
(
'is
_a
live'
),
repr
(
queue_tic_test_dict
))
# Call tic in foreground. This must not lead to activity execution.
activity_tool
.
tic
()
self
.
assertEqual
(
len
(
activity_tool
.
getMessageList
()),
1
)
...
...
@@ -1894,7 +1895,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
"""
original_query
=
six
.
get_unbound_function
(
DB
.
query
)
def
query
(
self
,
query_string
,
*
args
,
**
kw
):
if
query_string
.
startswith
(
'INSERT'
):
if
query_string
.
startswith
(
b
'INSERT'
):
insert_list
.
append
(
len
(
query_string
))
if
not
n
:
raise
Skip
...
...
@@ -2490,7 +2491,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self
.
assertEqual
(
1
,
activity_tool
.
countMessage
())
self
.
flushAllActivities
()
sender
,
recipients
,
mail
=
message_list
.
pop
()
self
.
assertIn
(
'UID mismatch'
,
mail
)
self
.
assertIn
(
b
'UID mismatch'
,
mail
)
m
,
=
activity_tool
.
getMessageList
()
self
.
assertEqual
(
m
.
processing_node
,
INVOKE_ERROR_STATE
)
obj
.
flushActivity
()
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment