Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
erp5
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Levin Zimmermann
erp5
Commits
d25efe94
Commit
d25efe94
authored
Apr 06, 2022
by
Arnaud Fontaine
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
py3: _mysql.string_literal() returns bytes().
parent
246a589a
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
63 additions
and
62 deletions
+63
-62
product/CMFActivity/Activity/SQLBase.py
product/CMFActivity/Activity/SQLBase.py
+50
-50
product/CMFActivity/Activity/SQLDict.py
product/CMFActivity/Activity/SQLDict.py
+11
-11
product/CMFActivity/ActivityTool.py
product/CMFActivity/ActivityTool.py
+2
-1
No files found.
product/CMFActivity/Activity/SQLBase.py
View file @
d25efe94
...
...
@@ -110,14 +110,14 @@ def SQLLock(db, lock_name, timeout):
"""
lock_name
=
db
.
string_literal
(
lock_name
)
query
=
db
.
query
(
_
,
((
acquired
,
),
))
=
query
(
'SELECT GET_LOCK(%s, %f)'
%
(
lock_name
,
timeout
))
(
_
,
((
acquired
,
),
))
=
query
(
b
'SELECT GET_LOCK(%s, %f)'
%
(
lock_name
,
timeout
))
if
acquired
is
None
:
raise
ValueError
(
'Error acquiring lock'
)
try
:
yield
acquired
finally
:
if
acquired
:
query
(
'SELECT RELEASE_LOCK(%s)'
%
(
lock_name
,
))
query
(
b
'SELECT RELEASE_LOCK(%s)'
%
(
lock_name
,
))
# sqltest_dict ({'condition_name': <render_function>}) defines how to render
# condition statements in the SQL query used by SQLBase.getMessageList
def
sqltest_dict
():
...
...
@@ -161,13 +161,13 @@ def sqltest_dict():
assert
isinstance
(
priority
,
_SQLTEST_NO_QUOTE_TYPE_SET
)
assert
isinstance
(
uid
,
_SQLTEST_NO_QUOTE_TYPE_SET
)
return
(
'(priority>%(priority)s OR (priority=%(priority)s
AND '
'(date>%(date)s OR (date=%(date)s AND uid>%(uid)s
))'
'))'
%
{
'priority'
:
priority
,
b'(priority>%(priority)d OR (priority=%(priority)d
AND '
b'(date>%(date)s OR (date=%(date)s AND uid>%(uid)d
))'
b
'))'
%
{
b
'priority'
:
priority
,
# render_datetime raises if "date" lacks date API, so no need to check
'date'
:
render_string
(
render_datetime
(
date
)),
'uid'
:
uid
,
b
'date'
:
render_string
(
render_datetime
(
date
)),
b
'uid'
:
uid
,
}
)
sqltest_dict
[
'above_priority_date_uid'
]
=
renderAbovePriorityDateUid
...
...
@@ -178,7 +178,7 @@ def _validate_after_path_and_method_id(value, render_string):
path
,
method_id
=
value
return
(
sqltest_dict
[
'method_id'
](
method_id
,
render_string
)
+
' AND '
+
b
' AND '
+
sqltest_dict
[
'path'
](
path
,
render_string
)
)
...
...
@@ -186,7 +186,7 @@ def _validate_after_tag_and_method_id(value, render_string):
tag
,
method_id
=
value
return
(
sqltest_dict
[
'method_id'
](
method_id
,
render_string
)
+
' AND '
+
b
' AND '
+
sqltest_dict
[
'tag'
](
tag
,
render_string
)
)
...
...
@@ -403,17 +403,17 @@ CREATE TABLE %s (
def
hasActivitySQL
(
self
,
quote
,
only_valid
=
False
,
only_invalid
=
False
,
**
kw
):
where
=
[
sqltest_dict
[
k
](
v
,
quote
)
for
(
k
,
v
)
in
kw
.
items
()
if
v
]
if
only_valid
:
where
.
append
(
'processing_node > %d'
%
INVOKE_ERROR_STATE
)
where
.
append
(
b
'processing_node > %d'
%
INVOKE_ERROR_STATE
)
if
only_invalid
:
where
.
append
(
'processing_node <= %d'
%
INVOKE_ERROR_STATE
)
return
"SELECT 1 FROM %s WHERE %s LIMIT 1"
%
(
self
.
sql_table
,
" AND "
.
join
(
where
)
or
"1"
)
where
.
append
(
b
'processing_node <= %d'
%
INVOKE_ERROR_STATE
)
return
b
"SELECT 1 FROM %s WHERE %s LIMIT 1"
%
(
self
.
sql_table
.
encode
(),
b" AND "
.
join
(
where
)
or
b
"1"
)
def
getPriority
(
self
,
activity_tool
,
processing_node
,
node_set
=
None
):
if
node_set
is
None
:
q
=
(
"SELECT 3*priority, date FROM %s"
" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1"
%
self
.
sql_table
)
q
=
(
b
"SELECT 3*priority, date FROM %s"
b
" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
b" ORDER BY priority, date LIMIT 1"
%
self
.
sql_table
.
encode
()
)
else
:
subquery
=
(
"(SELECT 3*priority{} as effective_priority, date FROM %s"
" WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)"
...
...
@@ -421,12 +421,12 @@ CREATE TABLE %s (
node
=
'node=%s'
%
processing_node
# "ALL" on all but one, to incur deduplication cost only once.
# "UNION ALL" between the two naturally distinct sets.
q
=
(
"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
" ORDER BY effective_priority, date LIMIT 1"
%
(
subquery
(
-
1
,
node
),
subquery
(
''
,
'node=0'
),
subquery
(
'+IF(node, IF(%s, -1, 1), 0)'
%
node
,
'node>=0'
),
' UNION ALL '
+
subquery
(
-
1
,
'node IN (%s)'
%
','
.
join
(
map
(
str
,
node_set
)))
if
node_set
else
''
,
q
=
(
b
"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
b
" ORDER BY effective_priority, date LIMIT 1"
%
(
subquery
(
-
1
,
node
)
.
encode
()
,
subquery
(
''
,
'node=0'
)
.
encode
()
,
subquery
(
'+IF(node, IF(%s, -1, 1), 0)'
%
node
,
'node>=0'
)
.
encode
()
,
b' UNION ALL '
+
subquery
(
-
1
,
'node IN (%s)'
%
','
.
join
(
map
(
str
,
node_set
))).
encode
()
if
node_set
else
b
''
,
))
result
=
activity_tool
.
getSQLConnection
().
query
(
q
,
0
)[
1
]
if
result
:
...
...
@@ -599,18 +599,18 @@ CREATE TABLE %s (
if
len
(
column_list
)
==
1
else
_IDENTITY
)
base_sql_suffix
=
' WHERE processing_node > %i AND (%%s) LIMIT 1)'
%
(
base_sql_suffix
=
b
' WHERE processing_node > %i AND (%%s) LIMIT 1)'
%
(
min_processing_node
,
)
sql_suffix_list
=
[
base_sql_suffix
%
to_sql
(
dependency_value
,
quote
)
for
dependency_value
in
dependency_value_dict
]
base_sql_prefix
=
'(SELECT %s FROM '
%
(
','
.
join
(
column_list
),
base_sql_prefix
=
b
'(SELECT %s FROM '
%
(
b','
.
join
([
c
.
encode
()
for
c
in
column_list
]
),
)
subquery_list
=
[
base_sql_prefix
+
table_name
+
sql_suffix
base_sql_prefix
+
table_name
.
encode
()
+
sql_suffix
for
table_name
in
table_name_list
for
sql_suffix
in
sql_suffix_list
]
...
...
@@ -621,7 +621,7 @@ CREATE TABLE %s (
# by the number of activty tables: it is also proportional to the
# number of distinct values being looked for in the current column.
for
row
in
db
.
query
(
' UNION '
.
join
(
subquery_list
[
_MAX_DEPENDENCY_UNION_SUBQUERY_COUNT
:]),
b
' UNION '
.
join
(
subquery_list
[
_MAX_DEPENDENCY_UNION_SUBQUERY_COUNT
:]),
max_rows
=
0
,
)[
1
]:
# Each row is a value which blocks some activities.
...
...
@@ -695,9 +695,9 @@ CREATE TABLE %s (
assert
limit
quote
=
db
.
string_literal
query
=
db
.
query
args
=
(
self
.
sql_table
,
sqltest_dict
[
'to_date'
](
date
,
quote
),
' AND group_method_id='
+
quote
(
group_method_id
)
if
group_method_id
else
''
,
limit
)
args
=
(
self
.
sql_table
.
encode
()
,
sqltest_dict
[
'to_date'
](
date
,
quote
),
b
' AND group_method_id='
+
quote
(
group_method_id
)
if
group_method_id
else
b
''
,
limit
)
# Note: Not all write accesses to our table are protected by this lock.
# This lock is not here for data consistency reasons, but to avoid wasting
...
...
@@ -726,25 +726,25 @@ CREATE TABLE %s (
# time).
if
node_set
is
None
:
result
=
Results
(
query
(
"SELECT * FROM %s WHERE processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s
FOR UPDATE"
%
args
,
0
))
b
"SELECT * FROM %s WHERE processing_node=0 AND %s%s"
b" ORDER BY priority, date LIMIT %d
FOR UPDATE"
%
args
,
0
))
else
:
# We'd like to write
# ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date
# but this makes indices inefficient.
subquery
=
(
"(SELECT *, 3*priority{}
as effective_priority FROM %s"
" WHERE {}
AND processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE)"
%
args
).
format
node
=
'node=%s
'
%
processing_node
subquery
=
(
b"(SELECT *, 3*priority%%s
as effective_priority FROM %s"
b" WHERE %%s
AND processing_node=0 AND %s%s"
b" ORDER BY priority, date LIMIT %d FOR UPDATE)"
%
args
)
node
=
b'node=%d
'
%
processing_node
result
=
Results
(
query
(
# "ALL" on all but one, to incur deduplication cost only once.
# "UNION ALL" between the two naturally distinct sets.
"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
" ORDER BY effective_priority, date LIMIT %s
"
%
(
subquery
(
-
1
,
node
),
subquery
(
''
,
'node=0'
),
subquery
(
'+IF(node, IF(%s, -1, 1), 0)'
%
node
,
'node>=0'
),
' UNION ALL '
+
subquery
(
-
1
,
'node IN (%s)'
%
','
.
join
(
map
(
str
,
node_set
)))
if
node_set
else
''
,
b
"SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
b" ORDER BY effective_priority, date LIMIT %d
"
%
(
subquery
%
(
b'-1'
,
node
),
subquery
%
(
b''
,
b
'node=0'
),
subquery
%
(
b'+IF(node, IF(%s, -1, 1), 0)'
%
node
,
b
'node>=0'
),
b' UNION ALL '
+
subquery
%
(
str
(
-
1
),
b'node IN (%s)'
%
b','
.
join
(
map
(
str
,
node_set
)).
encode
())
if
node_set
else
b
''
,
limit
),
0
))
if
result
:
# Reserve messages.
...
...
@@ -807,9 +807,9 @@ CREATE TABLE %s (
# To minimize the probability of deadlocks, we also COMMIT so that a
# new transaction starts on the first 'FOR UPDATE' query, which is all
# the more important as the current on started with getPriority().
result
=
db
.
query
(
"SELECT * FROM %s WHERE processing_node=%s
"
" ORDER BY priority, date LIMIT 1
\
0
COMMIT"
%
(
self
.
sql_table
,
processing_node
),
0
)
result
=
db
.
query
(
b"SELECT * FROM %s WHERE processing_node=%d
"
b
" ORDER BY priority, date LIMIT 1
\
0
COMMIT"
%
(
self
.
sql_table
.
encode
()
,
processing_node
),
0
)
already_assigned
=
result
[
1
]
if
already_assigned
:
result
=
Results
(
result
)
...
...
@@ -838,10 +838,10 @@ CREATE TABLE %s (
cost
*=
count
# Retrieve objects which have the same group method.
result
=
iter
(
already_assigned
and
Results
(
db
.
query
(
"SELECT * FROM %s"
" WHERE processing_node=%s
AND group_method_id=%s"
" ORDER BY priority, date LIMIT %s
"
%
(
self
.
sql_table
,
processing_node
,
and
Results
(
db
.
query
(
b
"SELECT * FROM %s"
b" WHERE processing_node=%d
AND group_method_id=%s"
b" ORDER BY priority, date LIMIT %d
"
%
(
self
.
sql_table
.
encode
()
,
processing_node
,
db
.
string_literal
(
group_method_id
),
limit
),
0
))
# Do not optimize rare case: keep the code simple by not
# adding more results from getReservedMessageList if the
...
...
product/CMFActivity/Activity/SQLDict.py
View file @
d25efe94
...
...
@@ -86,7 +86,7 @@ class SQLDict(SQLBase):
uid
=
line
.
uid
original_uid
=
path_and_method_id_dict
.
get
(
key
)
if
original_uid
is
None
:
sql_method_id
=
" AND method_id = %s AND group_method_id = %s"
%
(
sql_method_id
=
b
" AND method_id = %s AND group_method_id = %s"
%
(
quote
(
method_id
),
quote
(
line
.
group_method_id
))
m
=
Message
.
load
(
line
.
message
,
uid
=
uid
,
line
=
line
)
merge_parent
=
m
.
activity_kw
.
get
(
'merge_parent'
)
...
...
@@ -103,11 +103,11 @@ class SQLDict(SQLBase):
uid_list
=
[]
if
path_list
:
# Select parent messages.
result
=
Results
(
db
.
query
(
"SELECT * FROM message"
" WHERE processing_node IN (0, %s
) AND path IN (%s)%s"
" ORDER BY path LIMIT 1 FOR UPDATE"
%
(
result
=
Results
(
db
.
query
(
b
"SELECT * FROM message"
b" WHERE processing_node IN (0, %d
) AND path IN (%s)%s"
b
" ORDER BY path LIMIT 1 FOR UPDATE"
%
(
processing_node
,
','
.
join
(
map
(
quote
,
path_list
)),
b
','
.
join
(
map
(
quote
,
path_list
)),
sql_method_id
,
),
0
))
if
result
:
# found a parent
...
...
@@ -120,11 +120,11 @@ class SQLDict(SQLBase):
m
=
Message
.
load
(
line
.
message
,
uid
=
uid
,
line
=
line
)
# return unreserved similar children
path
=
line
.
path
result
=
db
.
query
(
"SELECT uid FROM message"
" WHERE processing_node = 0 AND (path = %s OR path LIKE %s)"
"%s FOR UPDATE"
%
(
result
=
db
.
query
(
b
"SELECT uid FROM message"
b
" WHERE processing_node = 0 AND (path = %s OR path LIKE %s)"
b
"%s FOR UPDATE"
%
(
quote
(
path
),
quote
(
path
.
replace
(
'_'
,
r'\
_
') + '
/%
'),
sql_method_id,
sql_method_id
.encode()
,
), 0)[1]
reserve_uid_list = [x for x, in result]
uid_list += reserve_uid_list
...
...
@@ -133,8 +133,8 @@ class SQLDict(SQLBase):
reserve_uid_list.append(uid)
else:
# Select duplicates.
result = db.query("SELECT uid FROM message"
" WHERE processing_node = 0 AND path = %s%s FOR UPDATE" % (
result = db.query(
b
"SELECT uid FROM message"
b
" WHERE processing_node = 0 AND path = %s%s FOR UPDATE" % (
quote(path), sql_method_id,
), 0)[1]
reserve_uid_list = uid_list = [x for x, in result]
...
...
product/CMFActivity/ActivityTool.py
View file @
d25efe94
...
...
@@ -1321,6 +1321,7 @@ class ActivityTool (BaseTool):
# Catch ALL exception to avoid killing timerserver.
LOG
(
'ActivityTool'
,
ERROR
,
'process_timer received an exception'
,
error
=
True
)
import
pdb
;
pdb
.
post_mortem
()
finally
:
setSecurityManager
(
old_sm
)
finally
:
...
...
@@ -1394,7 +1395,7 @@ class ActivityTool (BaseTool):
path
=
None
if
obj
is
None
else
'/'
.
join
(
obj
.
getPhysicalPath
())
db
=
self
.
getSQLConnection
()
quote
=
db
.
string_literal
return
bool
(
db
.
query
(
"(%s)"
%
") UNION ALL ("
.
join
(
return
bool
(
db
.
query
(
b"(%s)"
%
b
") UNION ALL ("
.
join
(
activity
.
hasActivitySQL
(
quote
,
path
=
path
,
**
kw
)
for
activity
in
activity_dict
.
values
()))[
1
])
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment