Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
neoppod
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Levin Zimmermann
neoppod
Commits
b6bbc50d
Commit
b6bbc50d
authored
Mar 20, 2021
by
Kirill Smelkov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
.
parent
05be8f76
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
13 additions
and
14 deletions
+13
-14
go/zodb/storage/demo/demo.go
go/zodb/storage/demo/demo.go
+13
-14
No files found.
go/zodb/storage/demo/demo.go
View file @
b6bbc50d
...
@@ -71,21 +71,20 @@ func (e *baseMutatedError) Error() string {
...
@@ -71,21 +71,20 @@ func (e *baseMutatedError) Error() string {
return
fmt
.
Sprintf
(
"base.head mutated from @%s to @%s"
,
e
.
baseAt0
,
e
.
baseHead
)
return
fmt
.
Sprintf
(
"base.head mutated from @%s to @%s"
,
e
.
baseAt0
,
e
.
baseHead
)
}
}
// baseError
is reported on error
detected on Storage.base .
// baseError
Event is reported on error event
detected on Storage.base .
type
baseError
struct
{
type
baseError
Event
struct
{
err
error
err
error
}
}
func
(
e
*
baseError
)
Error
()
string
{
func
(
e
*
baseError
Event
)
Error
()
string
{
return
fmt
.
Sprintf
(
"base: error: %s"
,
e
.
err
)
return
fmt
.
Sprintf
(
"base: error
event
: %s"
,
e
.
err
)
}
}
func
(
e
*
baseError
)
Cause
()
error
{
return
e
.
err
}
func
(
e
*
baseError
Event
)
Cause
()
error
{
return
e
.
err
}
func
(
e
*
baseError
)
Unwrap
()
error
{
return
e
.
err
}
func
(
e
*
baseError
Event
)
Unwrap
()
error
{
return
e
.
err
}
// watcher detects base mutation and proxies δ events to user watchq.
// watcher task detects base mutation and proxies δ events to user watchq.
// it runs as separate goroutine.
func
(
d
*
Storage
)
watcher
(
ctx
context
.
Context
)
error
{
func
(
d
*
Storage
)
watcher
(
ctx
context
.
Context
)
error
{
if
d
.
watchq
!=
nil
{
if
d
.
watchq
!=
nil
{
defer
close
(
d
.
watchq
)
defer
close
(
d
.
watchq
)
...
@@ -97,7 +96,7 @@ func (d *Storage) watcher(ctx context.Context) error {
...
@@ -97,7 +96,7 @@ func (d *Storage) watcher(ctx context.Context) error {
case
<-
ctx
.
Done
()
:
case
<-
ctx
.
Done
()
:
return
ctx
.
Err
()
return
ctx
.
Err
()
// event on base -> base mutated + shutdown
// event on base -> base mutated
/error
+ shutdown
case
event
,
ok
:=
<-
d
.
baseWatchq
:
case
event
,
ok
:=
<-
d
.
baseWatchq
:
if
!
ok
{
if
!
ok
{
// base closed
// base closed
...
@@ -111,7 +110,7 @@ func (d *Storage) watcher(ctx context.Context) error {
...
@@ -111,7 +110,7 @@ func (d *Storage) watcher(ctx context.Context) error {
edown
=
&
baseMutatedError
{
d
.
baseAt0
,
event
.
Tid
}
edown
=
&
baseMutatedError
{
d
.
baseAt0
,
event
.
Tid
}
case
*
zodb
.
EventError
:
case
*
zodb
.
EventError
:
edown
=
&
baseError
{
event
.
Err
}
edown
=
&
baseError
Event
{
event
.
Err
}
default
:
default
:
edown
=
fmt
.
Errorf
(
"base: unexpected event %T"
,
event
)
edown
=
fmt
.
Errorf
(
"base: unexpected event %T"
,
event
)
...
@@ -138,7 +137,7 @@ func (d *Storage) watcher(ctx context.Context) error {
...
@@ -138,7 +137,7 @@ func (d *Storage) watcher(ctx context.Context) error {
}
}
}
}
// shutdown marks Storage as no longer
being
operational due to reason.
// shutdown marks Storage as no longer operational due to reason.
func
(
d
*
Storage
)
shutdown
(
reason
error
)
{
func
(
d
*
Storage
)
shutdown
(
reason
error
)
{
d
.
downOnce
.
Do
(
func
()
{
d
.
downOnce
.
Do
(
func
()
{
d
.
downErr
=
reason
d
.
downErr
=
reason
...
@@ -160,7 +159,7 @@ func (d *Storage) Close() (err error) {
...
@@ -160,7 +159,7 @@ func (d *Storage) Close() (err error) {
errδ
:=
d
.
δ
.
Close
()
errδ
:=
d
.
δ
.
Close
()
errBase
:=
d
.
base
.
Close
()
errBase
:=
d
.
base
.
Close
()
// cancel watcher
.
don't propagate its error to Close - watcher error
// cancel watcher
;
don't propagate its error to Close - watcher error
// goes to watchq and op errors.
// goes to watchq and op errors.
d
.
watchCancel
()
d
.
watchCancel
()
_
=
d
.
watchWG
.
Wait
()
_
=
d
.
watchWG
.
Wait
()
...
@@ -254,8 +253,8 @@ func (d *Storage) Load(ctx context.Context, xid zodb.Xid) (_ *mem.Buf, _ zodb.Ti
...
@@ -254,8 +253,8 @@ func (d *Storage) Load(ctx context.Context, xid zodb.Xid) (_ *mem.Buf, _ zodb.Ti
}
}
}
}
// cap
.at in xid
to .baseAt0 (we convert it back on error return, and
// cap
xid.At
to .baseAt0 (we convert it back on error return, and
// it makes more robust wrt simultaneous base mutation).
// it makes
Load
more robust wrt simultaneous base mutation).
xidBase
:=
xid
xidBase
:=
xid
if
xid
.
At
>
d
.
baseAt0
{
if
xid
.
At
>
d
.
baseAt0
{
xidBase
.
At
=
d
.
baseAt0
xidBase
.
At
=
d
.
baseAt0
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment