Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Labels
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Commits
Open sidebar
nexedi
cython
Commits
0474ac90
Commit
0474ac90
authored
Aug 12, 2020
by
Xavier Thompson
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Change cypclass 'Object Invocation Lock' implementation
parent
aa7fcada
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
238 additions
and
260 deletions
+238
-260
Cython/Compiler/Builtin.py
Cython/Compiler/Builtin.py
+1
-1
Cython/Compiler/CypclassTransforms.py
Cython/Compiler/CypclassTransforms.py
+0
-5
Cython/Compiler/ExprNodes.py
Cython/Compiler/ExprNodes.py
+4
-1
Cython/Compiler/ModuleNode.py
Cython/Compiler/ModuleNode.py
+19
-14
Cython/Compiler/Nodes.py
Cython/Compiler/Nodes.py
+5
-8
Cython/Utility/CyObjects.cpp
Cython/Utility/CyObjects.cpp
+209
-231
No files found.
Cython/Compiler/Builtin.py
View file @
0474ac90
...
...
@@ -676,7 +676,7 @@ def inject_cypclass_refcount_macros():
def
inject_cypclass_lock_macros
():
blocking_macro_type
=
PyrexTypes
.
CFuncType
(
PyrexTypes
.
c_void_type
,
[
PyrexTypes
.
CFuncTypeArg
(
"obj"
,
PyrexTypes
.
cy_object_type
,
None
)],
nogil
=
1
)
for
macro
in
(
"Cy_RLOCK"
,
"Cy_WLOCK"
,
"Cy_UNLOCK"
):
for
macro
in
(
"Cy_RLOCK"
,
"Cy_WLOCK"
,
"Cy_UN
WLOCK"
,
"Cy_UNR
LOCK"
):
builtin_scope
.
declare_builtin_cfunction
(
macro
,
blocking_macro_type
,
macro
)
nonblocking_macro_type
=
PyrexTypes
.
CFuncType
(
PyrexTypes
.
c_int_type
,
[
PyrexTypes
.
CFuncTypeArg
(
"obj"
,
PyrexTypes
.
cy_object_type
,
None
)],
nogil
=
1
)
for
macro
in
(
"Cy_TRYRLOCK"
,
"Cy_TRYWLOCK"
):
...
...
Cython/Compiler/CypclassTransforms.py
View file @
0474ac90
...
...
@@ -465,11 +465,6 @@ class CypclassLockTransform(Visitor.EnvTransform):
self
.
transform
.
rlocked
[
entry
]
+=
1
elif
state
==
'wlocked'
:
self
.
transform
.
wlocked
[
entry
]
+=
1
elif
state
==
'unlocked'
:
if
self
.
rlocked
>
0
:
self
.
transform
.
rlocked
[
entry
]
-=
1
elif
self
.
wlocked
>
0
:
self
.
transform
.
wlocked
[
entry
]
-=
1
def
__exit__
(
self
,
*
args
):
entry
=
self
.
entry
...
...
Cython/Compiler/ExprNodes.py
View file @
0474ac90
...
...
@@ -14007,7 +14007,10 @@ class CoerceToLockedTempNode(CoerceToTempNode):
code
.
putln
(
"Cy_WLOCK(%s);"
%
self
.
result
())
def
generate_disposal_code
(
self
,
code
):
code
.
putln
(
"Cy_UNLOCK(%s);"
%
self
.
result
())
if
self
.
rlock_only
:
code
.
putln
(
"Cy_UNRLOCK(%s);"
%
self
.
result
())
else
:
code
.
putln
(
"Cy_UNWLOCK(%s);"
%
self
.
result
())
super
(
CoerceToLockedTempNode
,
self
).
generate_disposal_code
(
code
)
...
...
Cython/Compiler/ModuleNode.py
View file @
0474ac90
...
...
@@ -926,7 +926,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def
generate_cyp_class_deferred_definitions
(
self
,
env
,
code
,
definition
):
"""
Generate all cypclass method definitions, deferred till now
Generate all cypclass method definitions, deferred till now
.
"""
for
entry
,
scope
in
env
.
iter_cypclass_entries_and_scopes
():
...
...
@@ -948,7 +948,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def
generate_cyp_class_attrs_destructor_definition
(
self
,
entry
,
code
):
"""
Generate destructor definition for the given cypclass entry
Generate destructor definition for the given cypclass entry
.
"""
scope
=
entry
.
type
.
scope
...
...
@@ -966,7 +966,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def
generate_cyp_class_activate_function
(
self
,
entry
,
code
):
"""
Generate activate function for activable cypclass entries
Generate activate function for activable cypclass entries
.
"""
active_self_entry
=
entry
.
type
.
scope
.
lookup_here
(
"<active_self>"
)
...
...
@@ -1016,7 +1016,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def
generate_cyp_class_activated_class
(
self
,
entry
,
code
):
"""
Generate activated c
lass
Generate activated c
ypclass.
"""
from
.
import
Builtin
...
...
@@ -1085,7 +1085,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
code
.
putln
(
"if (this->%s != NULL) {"
%
queue_attr_cname
)
code
.
putln
(
"Cy_WLOCK(%s);"
%
queue_attr_cname
)
code
.
putln
(
"this->%s->push(message);"
%
queue_attr_cname
)
code
.
putln
(
"Cy_UNLOCK(%s);"
%
queue_attr_cname
)
code
.
putln
(
"Cy_UN
W
LOCK(%s);"
%
queue_attr_cname
)
code
.
putln
(
"} else {"
)
code
.
putln
(
"/* We should definitely shout here */"
)
code
.
putln
(
'fprintf(stderr, "Acthon error: No queue to push to for %s remote call !
\
\
n");'
%
reified_function_entry
.
name
)
...
...
@@ -1098,7 +1098,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def
generate_cyp_class_reifying_entries
(
self
,
entry
,
code
):
"""
Generate code to reify the cypclass entr
y ? -> TODO what does this do exactly ?
Generate code to reify the cypclass entr
ies.
"""
target_object_type
=
entry
.
type
...
...
@@ -1217,7 +1217,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
code
.
putln
(
"if (this->%s != NULL) {"
%
sync_attr_cname
)
code
.
putln
(
"if (!Cy_TRYRLOCK(this->%s)) {"
%
sync_attr_cname
)
code
.
putln
(
"%s = this->%s->isActivable();"
%
(
sync_result
,
sync_attr_cname
))
code
.
putln
(
"Cy_UNLOCK(this->%s);"
%
sync_attr_cname
)
code
.
putln
(
"Cy_UN
R
LOCK(this->%s);"
%
sync_attr_cname
)
code
.
putln
(
"}"
)
code
.
putln
(
"if (%s == 0) return 0;"
%
sync_result
)
code
.
putln
(
"}"
)
...
...
@@ -1297,19 +1297,22 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
num_unlock
=
0
# Target object first, then arguments
code
.
putln
(
"if (%s > %s) {"
%
(
trylock_result
,
num_unlock
))
code
.
putln
(
"Cy_UNLOCK(this->%s);"
%
target_object_cname
)
unlock_op
=
"Cy_UNRLOCK"
if
reified_function_entry
.
type
.
is_const_method
else
"Cy_UNWLOCK"
code
.
putln
(
"%s(this->%s);"
%
(
unlock_op
,
target_object_cname
))
num_unlock
+=
1
for
i
,
narg
in
enumerate
(
func_type
.
args
[:
narg_count
]):
if
narg
.
type
.
is_cyp_class
:
code
.
putln
(
"if (%s > %s) {"
%
(
trylock_result
,
num_unlock
))
code
.
putln
(
"Cy_UNLOCK(this->%s);"
%
narg
.
cname
)
unlock_op
=
"Cy_UNRLOCK"
if
narg
.
type
.
is_const
else
"Cy_UNWLOCK"
code
.
putln
(
"%s(this->%s);"
%
(
unlock_op
,
narg
.
cname
))
num_unlock
+=
1
if
opt_arg_count
and
num_optional_if
:
code
.
putln
(
"if (this->%s != NULL) {"
%
opt_arg_name
)
for
opt_idx
,
optarg
in
enumerate
(
func_type
.
args
[
narg_count
:]):
if
optarg
.
type
.
is_cyp_class
:
code
.
putln
(
"if (%s > %s) {"
%
(
trylock_result
,
num_unlock
))
code
.
putln
(
"Cy_UNLOCK(this->%s->%s);"
%
(
opt_arg_name
,
func_type
.
opt_arg_cname
(
optarg
.
name
)))
unlock_op
=
"Cy_UNRLOCK"
if
optarg
.
type
.
is_const
else
"Cy_UNWLOCK"
code
.
putln
(
"%s(this->%s->%s);"
%
(
unlock_op
,
opt_arg_name
,
func_type
.
opt_arg_cname
(
optarg
.
name
)))
num_unlock
+=
1
# Note: we do not respect the semantic order of end-blocks here for simplification purpose.
# This one is for the "not NULL opt arg" check
...
...
@@ -1330,8 +1333,10 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
", "
.
join
(
"this->%s"
%
arg_cname
for
arg_cname
in
reified_call_args_list
)
)
)
code
.
putln
(
"Cy_UNLOCK(this->%s);"
%
target_object_cname
)
put_cypclass_op_on_narg_optarg
(
lambda
_
:
"Cy_UNLOCK"
,
reified_function_entry
.
type
,
Naming
.
optional_args_cname
,
code
)
unlock_op
=
"Cy_UNRLOCK"
if
reified_function_entry
.
type
.
is_const_method
else
"Cy_UNWLOCK"
code
.
putln
(
"%s(this->%s);"
%
(
unlock_op
,
target_object_cname
))
arg_unlocker
=
lambda
arg
:
"Cy_UNRLOCK"
if
arg
.
type
.
is_const
else
"Cy_UNWLOCK"
put_cypclass_op_on_narg_optarg
(
arg_unlocker
,
reified_function_entry
.
type
,
Naming
.
optional_args_cname
,
code
)
code
.
putln
(
"/* Push result in the result object */"
)
if
does_return
:
code
.
putln
(
"Cy_WLOCK(this->%s);"
%
result_attr_cname
)
...
...
@@ -1339,7 +1344,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
code
.
putln
(
"this->%s->pushIntResult(result);"
%
result_attr_cname
)
else
:
code
.
putln
(
"this->%s->pushVoidStarResult((void*)result);"
%
result_attr_cname
)
code
.
putln
(
"Cy_UNLOCK(this->%s);"
%
result_attr_cname
)
code
.
putln
(
"Cy_UN
W
LOCK(this->%s);"
%
result_attr_cname
)
code
.
putln
(
"return 1;"
)
code
.
putln
(
"}"
)
...
...
@@ -1354,7 +1359,7 @@ class ModuleNode(Nodes.Node, Nodes.BlockNode):
def
generate_cyp_class_wrapper_definition
(
self
,
type
,
wrapper_entry
,
constructor_entry
,
new_entry
,
alloc_entry
,
code
):
"""
Generate
cypclass constructor wrapper ? -> TODO what does this do exactly ?
Generate
the cypclass constructor wrapper.
"""
if
type
.
templates
:
...
...
Cython/Compiler/Nodes.py
View file @
0474ac90
...
...
@@ -8546,7 +8546,7 @@ class EnsureGILNode(GILExitNode):
code
.
put_ensure_gil
(
declare_gilstate
=
False
)
class
LockCypclassNode
(
StatNode
):
# 'with rlocked / wlocked [cypclass object]
' or 'with unlocked [cypclass object]' statement
# 'with rlocked / wlocked [cypclass object]
#
# state string 'rlocked' or 'wlocked' or 'unlocked'
# obj ExprNode the (un)locked object
...
...
@@ -8574,13 +8574,10 @@ class LockCypclassNode(StatNode):
self
.
body
.
generate_execution_code
(
code
)
# We must unlock if we held a lock previously, and relock if we unlocked.
if
self
.
state
!=
"unlocked"
:
code
.
putln
(
"Cy_UNLOCK(%s);"
%
self
.
obj
.
result
())
elif
self
.
was_wlocked
:
code
.
putln
(
"Cy_WLOCK(%s);"
%
self
.
obj
.
result
())
elif
self
.
was_rlocked
:
code
.
putln
(
"Cy_RLOCK(%s);"
%
self
.
obj
.
result
())
if
self
.
state
==
"rlocked"
:
code
.
putln
(
"Cy_UNRLOCK(%s);"
%
self
.
obj
.
result
())
elif
self
.
state
==
"wlocked"
:
code
.
putln
(
"Cy_UNWLOCK(%s);"
%
self
.
obj
.
result
())
def
cython_view_utility_code
():
...
...
Cython/Utility/CyObjects.cpp
View file @
0474ac90
...
...
@@ -14,8 +14,14 @@
#ifdef __cplusplus
#if __cplusplus >= 201103L
#include <atomic>
#include <cstdint>
using
namespace
std
;
#define CyObject_ATOMIC_REFCOUNT_TYPE atomic_int
#define CyObject_NO_OWNER -1
#define CyObject_MANY_OWNERS -2
#define CyObject_MAX_READERS (1 << 30)
#define CyObject_LOCK_ERROR_OTHER_WRITER (1 << 0)
#define CyObject_LOCK_ERROR_OTHER_READER (1 << 1)
#include <pthread.h>
...
...
@@ -28,32 +34,26 @@
#include <type_traits>
struct
ThreadStorage
{
pid_t
thread_id
;
unsigned
int
read_count
;
unsigned
int
write_count
;
};
class
RecursiveUpgradeableRWLock
{
pthread_rwlock_t
rw_lock
;
pthread_mutex_t
upgrade_lock
;
// Notes: This could be a rw_lock
pthread_mutex_t
thread_count_lock
;
std
::
vector
<
ThreadStorage
>
thread_count
;
protected:
ThreadStorage
&
get_or_init_thread_count
(
pid_t
thread_id
);
pthread_mutex_t
guard
;
pthread_cond_t
wait_readers_depart
;
pthread_cond_t
wait_writer_depart
;
atomic
<
pid_t
>
owner_id
;
atomic_int32_t
readers_nb
;
uint32_t
write_count
;
public:
RecursiveUpgradeableRWLock
()
{
pthread_
rwlock_init
(
&
this
->
rw_lock
,
NULL
);
pthread_
mutex_init
(
&
this
->
upgrade_lock
,
NULL
);
pthread_mutex_init
(
&
this
->
thread_count_lock
,
NULL
)
;
// Reserve space for up to 8 threads
this
->
thread_count
.
reserve
(
8
)
;
RecursiveUpgradeableRWLock
()
{
pthread_mutex_init
(
&
this
->
guard
,
NULL
);
pthread_
cond_init
(
&
this
->
wait_readers_depart
,
NULL
);
pthread_
cond_init
(
&
this
->
wait_writer_depart
,
NULL
);
this
->
owner_id
=
CyObject_NO_OWNER
;
this
->
readers_nb
=
0
;
this
->
write_count
=
0
;
}
void
wlock
();
void
rlock
();
void
unlock
();
void
unwlock
();
void
unrlock
();
int
tryrlock
();
int
trywlock
();
};
...
...
@@ -65,7 +65,6 @@
class
CyObject
:
public
CyPyObject
{
private:
CyObject_ATOMIC_REFCOUNT_TYPE
nogil_ob_refcnt
;
//pthread_rwlock_t ob_lock;
RecursiveUpgradeableRWLock
ob_lock
;
public:
CyObject
()
:
nogil_ob_refcnt
(
1
)
{}
...
...
@@ -75,7 +74,8 @@
int
CyObject_GETREF
();
void
CyObject_RLOCK
();
void
CyObject_WLOCK
();
void
CyObject_UNLOCK
();
void
CyObject_UNRLOCK
();
void
CyObject_UNWLOCK
();
int
CyObject_TRYRLOCK
();
int
CyObject_TRYWLOCK
();
};
...
...
@@ -138,62 +138,71 @@
static
inline
void
Cy_INCREF
(
T
)
{}
template
<
typename
T
,
typename
std
::
enable_if
<
std
::
is_convertible
<
T
,
CyObject
*
>
::
value
,
int
>::
type
=
0
>
static
inline
void
Cy_DECREF
(
T
&
o
p
)
{
if
(
o
p
->
CyObject_DECREF
())
o
p
=
NULL
;
static
inline
void
Cy_DECREF
(
T
&
o
b
)
{
if
(
o
b
->
CyObject_DECREF
())
o
b
=
NULL
;
}
template
<
typename
T
,
typename
std
::
enable_if
<
std
::
is_convertible
<
T
,
CyObject
*
>
::
value
,
int
>::
type
=
0
>
static
inline
void
Cy_XDECREF
(
T
&
o
p
)
{
if
(
o
p
!=
NULL
)
{
if
(
o
p
->
CyObject_DECREF
())
o
p
=
NULL
;
static
inline
void
Cy_XDECREF
(
T
&
o
b
)
{
if
(
o
b
!=
NULL
)
{
if
(
o
b
->
CyObject_DECREF
())
o
b
=
NULL
;
}
}
template
<
typename
T
,
typename
std
::
enable_if
<
std
::
is_convertible
<
T
,
CyObject
*
>
::
value
,
int
>::
type
=
0
>
static
inline
void
Cy_INCREF
(
T
o
p
)
{
if
(
o
p
!=
NULL
)
o
p
->
CyObject_INCREF
();
static
inline
void
Cy_INCREF
(
T
o
b
)
{
if
(
o
b
!=
NULL
)
o
b
->
CyObject_INCREF
();
}
static
inline
int
_Cy_GETREF
(
CyObject
*
o
p
)
{
return
o
p
->
CyObject_GETREF
();
static
inline
int
_Cy_GETREF
(
CyObject
*
o
b
)
{
return
o
b
->
CyObject_GETREF
();
}
static
inline
void
_Cy_RLOCK
(
CyObject
*
o
p
)
{
if
(
o
p
!=
NULL
)
{
o
p
->
CyObject_RLOCK
();
static
inline
void
_Cy_RLOCK
(
CyObject
*
o
b
)
{
if
(
o
b
!=
NULL
)
{
o
b
->
CyObject_RLOCK
();
}
else
{
fprintf
(
stderr
,
"ERROR: trying to read lock NULL !
\n
"
);
}
}
static
inline
void
_Cy_WLOCK
(
CyObject
*
o
p
)
{
if
(
o
p
!=
NULL
)
{
o
p
->
CyObject_WLOCK
();
static
inline
void
_Cy_WLOCK
(
CyObject
*
o
b
)
{
if
(
o
b
!=
NULL
)
{
o
b
->
CyObject_WLOCK
();
}
else
{
fprintf
(
stderr
,
"ERROR: trying to write lock NULL !
\n
"
);
}
}
static
inline
void
_Cy_UNLOCK
(
CyObject
*
op
)
{
if
(
op
!=
NULL
)
{
op
->
CyObject_UNLOCK
();
static
inline
void
_Cy_UNRLOCK
(
CyObject
*
ob
)
{
if
(
ob
!=
NULL
)
{
ob
->
CyObject_UNRLOCK
();
}
else
{
fprintf
(
stderr
,
"ERROR: trying to unrlock NULL !
\n
"
);
}
}
static
inline
void
_Cy_UNWLOCK
(
CyObject
*
ob
)
{
if
(
ob
!=
NULL
)
{
ob
->
CyObject_UNWLOCK
();
}
else
{
fprintf
(
stderr
,
"ERROR: trying to unlock NULL !
\n
"
);
fprintf
(
stderr
,
"ERROR: trying to un
w
lock NULL !
\n
"
);
}
}
static
inline
int
_Cy_TRYRLOCK
(
CyObject
*
o
p
)
{
return
o
p
->
CyObject_TRYRLOCK
();
static
inline
int
_Cy_TRYRLOCK
(
CyObject
*
o
b
)
{
return
o
b
->
CyObject_TRYRLOCK
();
}
static
inline
int
_Cy_TRYWLOCK
(
CyObject
*
o
p
)
{
return
o
p
->
CyObject_TRYWLOCK
();
static
inline
int
_Cy_TRYWLOCK
(
CyObject
*
o
b
)
{
return
o
b
->
CyObject_TRYWLOCK
();
}
/*
...
...
@@ -269,18 +278,19 @@
/* Cast argument to CyObject* type. */
#define _CyObject_CAST(op) op
#define Cy_GETREF(op) (_Cy_GETREF(_CyObject_CAST(op)))
#define Cy_GOTREF(op)
#define Cy_XGOTREF(op)
#define Cy_GIVEREF(op)
#define Cy_XGIVEREF(op)
#define Cy_RLOCK(op) _Cy_RLOCK(op)
#define Cy_WLOCK(op) _Cy_WLOCK(op)
#define Cy_UNLOCK(op) _Cy_UNLOCK(op)
#define Cy_TRYRLOCK(op) _Cy_TRYRLOCK(op)
#define Cy_TRYWLOCK(op) _Cy_TRYWLOCK(op)
#define _CyObject_CAST(ob) ob
#define Cy_GETREF(ob) (_Cy_GETREF(_CyObject_CAST(ob)))
#define Cy_GOTREF(ob)
#define Cy_XGOTREF(ob)
#define Cy_GIVEREF(ob)
#define Cy_XGIVEREF(ob)
#define Cy_RLOCK(ob) _Cy_RLOCK(ob)
#define Cy_WLOCK(ob) _Cy_WLOCK(ob)
#define Cy_UNRLOCK(ob) _Cy_UNRLOCK(ob)
#define Cy_UNWLOCK(ob) _Cy_UNWLOCK(ob)
#define Cy_TRYRLOCK(ob) _Cy_TRYRLOCK(ob)
#define Cy_TRYWLOCK(ob) _Cy_TRYWLOCK(ob)
#endif
#endif
...
...
@@ -297,200 +307,164 @@
#endif
/* __cplusplus */
void
RecursiveUpgradeableRWLock
::
rlock
()
{
pid_t
caller_id
=
syscall
(
SYS_gettid
);
ThreadStorage
&
RecursiveUpgradeableRWLock
::
get_or_init_thread_count
(
pid_t
thread_id
)
{
int
first_empty_index
=
-
1
;
int
match_index
=
-
1
;
pthread_mutex_lock
(
&
this
->
thread_count_lock
);
for
(
unsigned
int
i
=
0
;
i
<
this
->
thread_count
.
size
();
++
i
)
{
if
(
this
->
thread_count
[
i
].
thread_id
==
thread_id
)
match_index
=
i
;
if
(
first_empty_index
<
0
&&
this
->
thread_count
[
i
].
thread_id
==
0
)
first_empty_index
=
i
;
if
(
this
->
owner_id
==
caller_id
)
{
++
this
->
readers_nb
;
return
;
}
if
(
match_index
<
0
)
{
// We must get a new entry. The question is: do we have to reallocate space ?
// First, create the temporary entry
ThreadStorage
tmp_thread_entry
;
tmp_thread_entry
.
thread_id
=
thread_id
;
tmp_thread_entry
.
read_count
=
0
;
tmp_thread_entry
.
write_count
=
0
;
if
(
first_empty_index
<
0
)
{
// We have to reallocate space
match_index
=
this
->
thread_count
.
size
();
this
->
thread_count
.
push_back
(
tmp_thread_entry
);
}
else
{
// We can reuse an existing and empty cell
match_index
=
first_empty_index
;
this
->
thread_count
[
match_index
]
=
tmp_thread_entry
;
}
pthread_mutex_lock
(
&
this
->
guard
);
while
(
this
->
write_count
>
0
)
{
pthread_cond_wait
(
&
this
->
wait_writer_depart
,
&
this
->
guard
);
}
pthread_mutex_unlock
(
&
this
->
thread_count_lock
);
return
this
->
thread_count
[
match_index
];
}
this
->
owner_id
=
this
->
readers_nb
++
?
CyObject_MANY_OWNERS
:
caller_id
;
void
RecursiveUpgradeableRWLock
::
wlock
()
{
pid_t
my_tid
=
syscall
(
SYS_gettid
);
ThreadStorage
&
my_counts
=
this
->
get_or_init_thread_count
(
my_tid
);
bool
has_read_lock
=
my_counts
.
read_count
;
bool
has_write_lock
=
my_counts
.
write_count
;
int
mutex_trylock_error
=
-
1
;
if
(
!
has_write_lock
)
{
if
(
has_read_lock
)
{
mutex_trylock_error
=
pthread_mutex_trylock
(
&
this
->
upgrade_lock
);
// As you may have noticed, this is a trylock above, not a blocking lock.
// This is because we could generate a deadlock:
// Imagine 2 threads T1 and T2, both holding a read lock on the same lock.
// Now, T1 tries to upgrade. So it holds the mutex, then unlock it's read lock,
// then tries to take a write lock. As T2 still has a read lock, T1 blocks,
// waiting for T2 to release it's read lock.
// Now, imagine that, instead of releasing, T2 tries to upgrade.
// It will first try to take the mutex. And won't succeed, as T1 holds it.
// This annoying mutex is here to avoid snatching when upgrading.
// Indeed, if you imagine T1 holding a read lock, T1 tries to upgrade,
// and right after T3 tries to write-lock (from nothing).
// As T1 is releasing then taking the write lock, T3 could take the write lock
// before T1, which is not really what's intented for an upgradable lock.
// The strategy here is to allow an "all is right" case by trying to lock
// first in a non-blocking manner. If it succeeds, hurray, our lock
// won't be snatched, we can continue by releasing the read lock.
// If it doesn't, to avoid a potential deadlock, we first release the read lock
// then try to hold the mutex again. Our lock will be snatched.
// So, in either case, we unlock the read lock here.
pthread_rwlock_unlock
(
&
this
->
rw_lock
);
}
if
(
mutex_trylock_error
!=
0
)
// Two cases: failed upgrading, or trying to acquire a write lock without previous lock.
// In both situations, we're trying here to acquire a write lock from nothing,
// as we already dropped read lock in the failed upgrading case,
// so blocking is allowed here (can't deadlock as we don't own other locks)
pthread_mutex_lock
(
&
this
->
upgrade_lock
);
pthread_rwlock_wrlock
(
&
this
->
rw_lock
);
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
}
// If we already have the write lock we directly jump here
++
my_counts
.
write_count
;
pthread_mutex_unlock
(
&
this
->
guard
);
}
void
RecursiveUpgradeableRWLock
::
rlock
()
{
pid_t
my_tid
=
syscall
(
SYS_gettid
);
int
RecursiveUpgradeableRWLock
::
tryrlock
()
{
pid_t
caller_id
=
syscall
(
SYS_gettid
);
if
(
this
->
owner_id
==
caller_id
)
{
++
this
->
readers_nb
;
return
0
;
}
ThreadStorage
&
my_counts
=
this
->
get_or_init_thread_count
(
my_tid
);
bool
has_read_lock
=
my_counts
.
read_count
;
bool
has_write_lock
=
my_counts
.
write_count
;
// we must lock here, because a trylock could fail also when another thread is currently read-locking or read-unlocking
// but this means we might miss a writer arriving and leaving
pthread_mutex_lock
(
&
this
->
guard
)
;
if
(
!
has_write_lock
&&
!
has_read_lock
)
{
pthread_mutex_lock
(
&
this
->
upgrade_lock
);
pthread_rwlock_rdlock
(
&
this
->
rw_lock
);
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
if
(
this
->
write_count
>
0
)
{
return
CyObject_LOCK_ERROR_OTHER_WRITER
;
pthread_mutex_unlock
(
&
this
->
guard
);
}
// If we already have a lock (read or write), we directly jump here
++
my_counts
.
read_count
;
this
->
owner_id
=
this
->
readers_nb
++
?
CyObject_MANY_OWNERS
:
caller_id
;
pthread_mutex_unlock
(
&
this
->
guard
);
return
0
;
}
void
RecursiveUpgradeableRWLock
::
unlock
()
{
pid_t
my_tid
=
syscall
(
SYS_gettid
);
void
RecursiveUpgradeableRWLock
::
unrlock
()
{
ThreadStorage
&
my_counts
=
this
->
get_or_init_thread_count
(
my_tid
);
bool
has_read_lock
=
my_counts
.
read_count
;
bool
has_write_lock
=
my_counts
.
write_count
;
pthread_mutex_lock
(
&
this
->
guard
);
if
(
has_read_lock
)
{
--
my_counts
.
read_count
;
}
else
if
(
has_write_lock
)
{
--
my_counts
.
write_count
;
}
else
{
fprintf
(
stderr
,
"ERROR: trying to unlock already unlocked CyObject !
\n
"
);
return
;
}
if
(
!
my_counts
.
write_count
&&
!
my_counts
.
read_count
)
{
pthread_rwlock_unlock
(
&
this
->
rw_lock
);
my_counts
.
thread_id
=
0
;
if
(
--
this
->
readers_nb
==
0
)
{
if
(
this
->
write_count
==
0
)
{
this
->
owner_id
=
CyObject_NO_OWNER
;
}
// broadcast to wake up all the waiting writers
pthread_cond_broadcast
(
&
this
->
wait_readers_depart
);
}
pthread_mutex_unlock
(
&
this
->
guard
);
}
int
RecursiveUpgradeableRWLock
::
tryrlock
()
{
int
rw_trylock_error
;
pid_t
my_tid
=
syscall
(
SYS_gettid
);
ThreadStorage
&
my_counts
=
this
->
get_or_init_thread_count
(
my_tid
);
bool
has_read_lock
=
my_counts
.
read_count
;
bool
has_write_lock
=
my_counts
.
write_count
;
if
(
!
has_write_lock
&&
!
has_read_lock
)
{
pthread_mutex_lock
(
&
this
->
upgrade_lock
);
rw_trylock_error
=
pthread_rwlock_tryrdlock
(
&
this
->
rw_lock
);
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
if
(
rw_trylock_error
)
return
rw_trylock_error
;
void
RecursiveUpgradeableRWLock
::
wlock
()
{
pid_t
caller_id
=
syscall
(
SYS_gettid
);
if
(
this
->
owner_id
==
caller_id
)
{
if
(
this
->
write_count
)
{
++
this
->
write_count
;
return
;
}
}
++
my_counts
.
read_count
;
return
0
;
pthread_mutex_lock
(
&
this
->
guard
);
if
(
this
->
owner_id
!=
caller_id
)
{
// we wait for readers first and bottleneck the writers after
// this works for a readers preferring approach
// because the other way around would require setting write_count to 1 before waiting on the readers
// in order to ensure only one writer at a times goes beyound waiting for the other writers to leave
// this would be more in line with a writers preferring approach, but that can deadlock when a thread recurses on a readlock
// this also means that we must broadcast when the readers leave in order to give all the waiting writers a chance to wake up
// new readers might still pass before some of the waiting writers, but then they'll just wait some more
// OTOH, if the new readers come, not broadcasting would let some writers wait forever
while
(
this
->
readers_nb
>
0
)
{
pthread_cond_wait
(
&
this
->
wait_readers_depart
,
&
this
->
guard
);
}
while
(
this
->
write_count
>
0
)
{
pthread_cond_wait
(
&
this
->
wait_writer_depart
,
&
this
->
guard
);
}
this
->
owner_id
=
caller_id
;
}
this
->
write_count
=
1
;
pthread_mutex_unlock
(
&
this
->
guard
);
}
int
RecursiveUpgradeableRWLock
::
trywlock
()
{
int
rw_trylock_error
;
int
mutex_trylock_error
;
pid_t
my_tid
=
syscall
(
SYS_gettid
);
ThreadStorage
&
my_counts
=
this
->
get_or_init_thread_count
(
my_tid
);
bool
has_read_lock
=
my_counts
.
read_count
;
bool
has_write_lock
=
my_counts
.
write_count
;
if
(
!
has_write_lock
)
{
if
(
has_read_lock
)
{
mutex_trylock_error
=
pthread_mutex_trylock
(
&
this
->
upgrade_lock
);
if
(
mutex_trylock_error
)
{
// In contrast to the blocking write lock,
// if we fail here we do want to keep the read lock.
return
mutex_trylock_error
;
}
// Here, we have the lock -> try to upgrade
pthread_rwlock_unlock
(
&
this
->
rw_lock
);
rw_trylock_error
=
pthread_rwlock_trywrlock
(
&
this
->
rw_lock
);
if
(
rw_trylock_error
)
{
// Get the read lock again. As we have the mutex, no one
// is trying to upgrade nor to acquire a lock,
// so the call here should return immediately
pthread_rwlock_rdlock
(
&
this
->
rw_lock
);
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
return
rw_trylock_error
;
}
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
pid_t
caller_id
=
syscall
(
SYS_gettid
);
if
(
this
->
owner_id
==
caller_id
)
{
if
(
this
->
write_count
)
{
++
this
->
write_count
;
return
0
;
}
mutex_trylock_error
=
pthread_mutex_trylock
(
&
this
->
upgrade_lock
);
if
(
mutex_trylock_error
)
{
// Keep previous state, so we will indeed keep read-lock
// if we had one.
return
mutex_trylock_error
;
}
if
(
pthread_mutex_trylock
(
&
this
->
guard
)
!=
0
)
{
// another thread is currently doing a lock operation on this lock, but we don't know what it is.
// we could choose to do a blocking lock instead of a trylock here.
// that way we would not detect when an unlock overlaps with this trylock,
// but also all the contending readers and / or writers could leave while we block,
// so we wouldn't detect those contentions either.
return
CyObject_LOCK_ERROR_OTHER_WRITER
|
CyObject_LOCK_ERROR_OTHER_READER
;
}
if
(
this
->
owner_id
!=
caller_id
)
{
if
(
this
->
readers_nb
>
0
)
{
pthread_mutex_unlock
(
&
this
->
guard
);
return
CyObject_LOCK_ERROR_OTHER_READER
;
}
if
(
has_read_lock
)
pthread_rwlock_unlock
(
&
this
->
rw_lock
);
rw_trylock_error
=
pthread_rwlock_trywrlock
(
&
this
->
rw_lock
);
if
(
rw_trylock_error
)
{
if
(
has_read_lock
)
{
// Get the read lock again. As we have the mutex, no one
// is trying to upgrade nor to acquire a lock,
// so the call here should return immediately
pthread_rwlock_rdlock
(
&
this
->
rw_lock
);
}
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
return
rw_trylock_error
;
if
(
this
->
write_count
>
0
)
{
pthread_mutex_unlock
(
&
this
->
guard
);
return
CyObject_LOCK_ERROR_OTHER_WRITER
;
}
pthread_mutex_unlock
(
&
this
->
upgrade_lock
);
this
->
owner_id
=
caller_id
;
}
++
my_counts
.
write_count
;
this
->
write_count
=
1
;
pthread_mutex_unlock
(
&
this
->
guard
);
return
0
;
}
void
RecursiveUpgradeableRWLock
::
unwlock
()
{
pthread_mutex_lock
(
&
this
->
guard
);
if
(
--
this
->
write_count
==
0
)
{
if
(
this
->
readers_nb
==
0
)
{
this
->
owner_id
=
CyObject_NO_OWNER
;
}
// broadcast to wake up all the waiting readers, + maybe one waiting writer
// more efficient to count r waiting readers and w waiting writers and signal n + (w > 0) times
pthread_cond_broadcast
(
&
this
->
wait_writer_depart
);
}
pthread_mutex_unlock
(
&
this
->
guard
);
}
void
CyObject
::
CyObject_INCREF
()
{
atomic_fetch_add
(
&
(
this
->
nogil_ob_refcnt
),
1
);
...
...
@@ -520,11 +494,6 @@ void CyObject::CyObject_WLOCK()
this
->
ob_lock
.
wlock
();
}
void
CyObject
::
CyObject_UNLOCK
()
{
this
->
ob_lock
.
unlock
();
}
int
CyObject
::
CyObject_TRYRLOCK
()
{
return
this
->
ob_lock
.
tryrlock
();
...
...
@@ -534,6 +503,15 @@ int CyObject::CyObject_TRYWLOCK()
{
return
this
->
ob_lock
.
trywlock
();
}
void
CyObject
::
CyObject_UNRLOCK
()
{
this
->
ob_lock
.
unrlock
();
}
void
CyObject
::
CyObject_UNWLOCK
()
{
this
->
ob_lock
.
unwlock
();
}
ActhonMessageInterface
::
ActhonMessageInterface
(
ActhonSyncInterface
*
sync_method
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment