Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
neoppod
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Levin Zimmermann
neoppod
Commits
e420f6f4
Commit
e420f6f4
authored
Feb 19, 2021
by
Kirill Smelkov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
.
parent
503c59b9
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
11 additions
and
12 deletions
+11
-12
go/neo/client.go
go/neo/client.go
+9
-10
go/neo/proto/error.go
go/neo/proto/error.go
+1
-1
go/neo/proto/proto.go
go/neo/proto/proto.go
+1
-1
No files found.
go/neo/client.go
View file @
e420f6f4
...
@@ -205,7 +205,7 @@ func (c *Client) invalidateObjects(msg *proto.InvalidateObjects) error {
...
@@ -205,7 +205,7 @@ func (c *Client) invalidateObjects(msg *proto.InvalidateObjects) error {
// syncMaster asks M for DB head right after identification.
// syncMaster asks M for DB head right after identification.
func
(
c
*
Client
)
syncMaster
(
ctx
context
.
Context
,
mlink
*
_MasterLink
)
(
err
error
)
{
func
(
c
*
Client
)
syncMaster
(
ctx
context
.
Context
,
mlink
*
_MasterLink
)
(
err
error
)
{
defer
task
.
Running
(
&
ctx
,
"sync0"
)(
&
err
)
//
XXX unify with Sync ?
defer
task
.
Running
(
&
ctx
,
"sync0"
)(
&
err
)
//
TODO try to unify with Sync
// query last_tid
// query last_tid
lastTxn
:=
proto
.
AnswerLastTransaction
{}
lastTxn
:=
proto
.
AnswerLastTransaction
{}
...
@@ -262,7 +262,7 @@ func (c *Client) flushEventq0() {
...
@@ -262,7 +262,7 @@ func (c *Client) flushEventq0() {
// Sync implements zodb.IStorageDriver.
// Sync implements zodb.IStorageDriver.
func
(
c
*
Client
)
Sync
(
ctx
context
.
Context
)
(
head
zodb
.
Tid
,
err
error
)
{
func
(
c
*
Client
)
Sync
(
ctx
context
.
Context
)
(
head
zodb
.
Tid
,
err
error
)
{
ctx
=
taskctx
.
Runningf
(
ctx
,
"%s: zsync"
,
c
.
node
.
MyInfo
.
NID
)
ctx
=
taskctx
.
Runningf
(
ctx
,
"%s: zsync"
,
c
.
node
.
MyInfo
.
NID
)
// XXX mynid locking
if
glog
.
V
(
2
)
{
if
glog
.
V
(
2
)
{
task
.
TraceBegin
(
ctx
)
task
.
TraceBegin
(
ctx
)
defer
func
()
{
task
.
TraceEnd
(
ctx
,
err
)
}()
defer
func
()
{
task
.
TraceEnd
(
ctx
,
err
)
}()
...
@@ -277,10 +277,9 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
...
@@ -277,10 +277,9 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
// XXX mlink can become down while we are making the call.
// XXX mlink can become down while we are making the call.
// XXX do we want to return error or retry?
// XXX do we want to return error or retry?
reply
:=
proto
.
AnswerLastTransaction
{}
reply
:=
proto
.
AnswerLastTransaction
{}
err
=
mlink
.
Ask1
(
&
proto
.
LastTransaction
{},
&
reply
)
// XXX
Ask += ctx
err
=
mlink
.
Ask1
(
&
proto
.
LastTransaction
{},
&
reply
)
// XXX
ctx cancel
if
err
!=
nil
{
if
err
!=
nil
{
// XXX ZODBErrDecode?
return
err
// NOTE no need for ZODBErrDecode
return
err
}
}
head
=
reply
.
Tid
head
=
reply
.
Tid
return
nil
return
nil
...
@@ -290,7 +289,7 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
...
@@ -290,7 +289,7 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
// Load implements zodb.IStorageDriver.
// Load implements zodb.IStorageDriver.
func
(
c
*
Client
)
Load
(
ctx
context
.
Context
,
xid
zodb
.
Xid
)
(
buf
*
mem
.
Buf
,
serial
zodb
.
Tid
,
err
error
)
{
func
(
c
*
Client
)
Load
(
ctx
context
.
Context
,
xid
zodb
.
Xid
)
(
buf
*
mem
.
Buf
,
serial
zodb
.
Tid
,
err
error
)
{
ctx
=
taskctx
.
Runningf
(
ctx
,
"%s: zload %s"
,
c
.
node
.
MyInfo
.
NID
,
xid
)
// XXX nid locking
ctx
=
taskctx
.
Runningf
(
ctx
,
"%s: zload %s"
,
c
.
node
.
MyInfo
.
NID
,
xid
)
// XXX
my
nid locking
if
glog
.
V
(
2
)
{
if
glog
.
V
(
2
)
{
task
.
TraceBegin
(
ctx
)
task
.
TraceBegin
(
ctx
)
defer
func
()
{
task
.
TraceEnd
(
ctx
,
err
)
}()
defer
func
()
{
task
.
TraceEnd
(
ctx
,
err
)
}()
...
@@ -324,9 +323,9 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
...
@@ -324,9 +323,9 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
return
nil
,
0
,
errors
.
Errorf
(
"internal inconsistency: cluster is operational, but no storages alive for oid %s"
,
xid
.
Oid
)
return
nil
,
0
,
errors
.
Errorf
(
"internal inconsistency: cluster is operational, but no storages alive for oid %s"
,
xid
.
Oid
)
}
}
//
XXX vvv temp stub -> TODO pick up 3 random storages and send load
//
TODO pick up 3 random storages and send load requests to them all,
//
requests to them all, getting the first who is the fastest to reply;
//
getting the first who is the fastest to reply; retry from the
//
retry from the beginning if all are found to fail?
//
beginning if all are found to fail.
stor
:=
storv
[
rand
.
Intn
(
len
(
storv
))]
stor
:=
storv
[
rand
.
Intn
(
len
(
storv
))]
slink
,
err
:=
stor
.
Dial
(
ctx
)
slink
,
err
:=
stor
.
Dial
(
ctx
)
...
@@ -335,7 +334,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
...
@@ -335,7 +334,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
}
}
// close accept after dialed (not to deadlock if S decides to send us
// close accept after dialed (not to deadlock if S decides to send us
// something).
// something).
slink
.
CloseAccept
()
//
XXX
need to close only after first real dial
slink
.
CloseAccept
()
//
NOTE
need to close only after first real dial
// on the wire it comes as "before", not "at"
// on the wire it comes as "before", not "at"
req
:=
proto
.
GetObject
{
req
:=
proto
.
GetObject
{
...
...
go/neo/proto/error.go
View file @
e420f6f4
...
@@ -35,7 +35,7 @@ import (
...
@@ -35,7 +35,7 @@ import (
// receiver side they can be recreated with ErrDecode. If err is zodb.OpError,
// receiver side they can be recreated with ErrDecode. If err is zodb.OpError,
// only its inner cause is encoded.
// only its inner cause is encoded.
//
//
// If err is not ZODB error -> it is
i
ncoded as "503".
// If err is not ZODB error -> it is
e
ncoded as "503".
func
ZODBErrEncode
(
err
error
)
*
Error
{
func
ZODBErrEncode
(
err
error
)
*
Error
{
e
,
ok
:=
err
.
(
*
zodb
.
OpError
)
e
,
ok
:=
err
.
(
*
zodb
.
OpError
)
if
ok
{
if
ok
{
...
...
go/neo/proto/proto.go
View file @
e420f6f4
...
@@ -442,7 +442,7 @@ type Pong struct{}
...
@@ -442,7 +442,7 @@ type Pong struct{}
type
CloseClient
struct
{
type
CloseClient
struct
{
}
}
// Ask node identier of the current primary master.
// Ask node identi
fi
er of the current primary master.
//
//
//neo:nodes ctl -> A
//neo:nodes ctl -> A
type
PrimaryMaster
struct
{
type
PrimaryMaster
struct
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment