Commit a41e0b4d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7025320e
......@@ -372,7 +372,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
var serveConn func(context.Context, *neo.Conn)
switch nodeInfo.NodeType {
case neo.CLIENT:
serveConn = stor.ServeClient
serveConn = stor.serveClient
default:
// XXX vvv should be reply to peer
......@@ -388,7 +388,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
break
}
// XXX wrap conn close to happen here, not in ServeClient ?
// XXX wrap conn close to happen here, not in serveClient ?
go serveConn(ctx, conn)
}
......@@ -396,9 +396,9 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
}
// ServeClient serves incoming connection on which peer identified itself as client
// serveClient serves incoming connection on which peer identified itself as client
// XXX +error return?
func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) {
func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
fmt.Printf("stor: %s: serving new client conn\n", conn)
// rederive ctx to be also cancelled if M tells us StopOperation
......@@ -418,62 +418,67 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *neo.Conn) {
}()
for {
req, err := conn.Recv()
if err != nil {
return // XXX log / err / send error before closing
}
serveClient1(conn) // XXX err check
}
}
switch req := req.(type) {
case *neo.GetObject:
xid := zodb.Xid{Oid: req.Oid}
if req.Serial != neo.INVALID_TID {
xid.Tid = req.Serial
xid.TidBefore = false
} else {
xid.Tid = req.Tid
xid.TidBefore = true
}
// serveClient1 serves 1 request from a client
func (stor *Storage) serveClient1(conn *neo.Conn) error {
req, err := conn.Recv()
if err != nil {
return // XXX log / err / send error before closing
}
var reply neo.Msg
data, tid, err := stor.zstor.Load(xid)
if err != nil {
// TODO translate err to NEO protocol error codes
reply = neo.ErrEncode(err)
} else {
reply = &neo.AnswerGetObject{
Oid: xid.Oid,
Serial: tid,
Compression: false,
Data: data,
// XXX .CheckSum
// XXX .NextSerial
// XXX .DataSerial
}
}
switch req := req.(type) {
case *neo.GetObject:
xid := zodb.Xid{Oid: req.Oid}
if req.Serial != neo.INVALID_TID {
xid.Tid = req.Serial
xid.TidBefore = false
} else {
xid.Tid = req.Tid
xid.TidBefore = true
}
conn.Send(reply) // XXX err
var reply neo.Msg
data, tid, err := stor.zstor.Load(xid)
if err != nil {
// TODO translate err to NEO protocol error codes
reply = neo.ErrEncode(err)
} else {
reply = &neo.AnswerGetObject{
Oid: xid.Oid,
Serial: tid,
Compression: false,
Data: data,
// XXX .CheckSum
// XXX .NextSerial
// XXX .DataSerial
}
}
case *neo.LastTransaction:
var reply neo.Msg
conn.Send(reply) // XXX err
lastTid, err := stor.zstor.LastTid()
if err != nil {
reply = neo.ErrEncode(err)
} else {
reply = &neo.AnswerLastTransaction{lastTid}
}
case *neo.LastTransaction:
var reply neo.Msg
conn.Send(reply) // XXX err
lastTid, err := stor.zstor.LastTid()
if err != nil {
reply = neo.ErrEncode(err)
} else {
reply = &neo.AnswerLastTransaction{lastTid}
}
//case *ObjectHistory:
//case *StoreObject:
conn.Send(reply) // XXX err
default:
panic("unexpected packet") // XXX
}
//case *ObjectHistory:
//case *StoreObject:
//req.Put(...)
default:
panic("unexpected packet") // XXX
}
//req.Put(...)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment