Commit 2aa27ef9 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb/zeo: Teach zlink to serve incoming notification and requests

This will be used in the next patch to handle invalidateTransaction
messages.
parent 63d322b9
......@@ -46,6 +46,9 @@ type zeo struct {
// driver client <- watcher: database commits | errors.
watchq chan<- zodb.Event // FIXME stub
// becomes ready when serve loop finishes
serveWG sync.WaitGroup
url string // we were opened via this
}
......@@ -351,6 +354,21 @@ func openByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (_ zodb
z := &zeo{link: zlink, watchq: opt.Watchq, url: url}
// start serve loop on the link
z.serveWG.Add(1)
go func() {
defer z.serveWG.Done()
err := zlink.Serve(
// notifyTab
nil,
// serveTab
nil,
)
_ = err
}()
// call register
rpc := z.rpc("register")
xlastTid, err := rpc.call(ctx, storageID, opt.ReadOnly)
if err != nil {
......@@ -406,6 +424,7 @@ func (z *zeo) Close() error {
if z.watchq != nil {
close(z.watchq)
}
z.serveWG.Wait()
return err
}
......
......@@ -47,22 +47,34 @@ var protoVersions = []string{
}
// zLink is ZEO connection between client (local end) and server (remote end).
// zLink is ZEO connection between client and server.
//
// zLink provides service to make RPC requests.
// XXX and receive notification from server (server sends invalidations)
// zLink provides service to make and receive RPC requests.
//
// create zLink via dialZLink or handshake.
// once link is created .Serve must be called on it.
type zLink struct {
link net.Conn // underlying network
rxbuf rbuf.RingBuf // buffer for reading from link
// calls in-flight
// our in-flight calls
callMu sync.Mutex
callTab map[int64]chan msg // msgid -> rxc for that call; nil when closed
callID int64 // ID for next call; incremented at every call
serveWg sync.WaitGroup // for serveRecv
// ready after serveTab and notifyTab are initialized
serveReady chan struct{}
// methods peer can invoke
// methods are served in parallel
serveTab map[string]func(context.Context, interface{})interface{}
// notifications peer can send
// notifications are invoked in received order
notifyTab map[string]func(interface{}) error
serveWg sync.WaitGroup // for serveRecv and serveTab spawned from it
serveCtx context.Context // serveTab handlers are called with this ctx
serveCancel func() // to cancel serveCtx
down1 sync.Once
errDown error // error with which the link was shut down
......@@ -73,10 +85,33 @@ type zLink struct {
// (called after handshake)
func (zl *zLink) start() {
zl.callTab = make(map[int64]chan msg)
zl.serveCtx, zl.serveCancel = context.WithCancel(context.Background())
zl.serveWg.Add(1)
go zl.serveRecv()
}
// Serve serves calls from remote peer according to notifyTab and serveTab.
//
// Serve returns when zlink becomes down - either on normal Close or on error.
// On normal Close returned error == nil, otherwise it describes the reason for
// why zlink was shut down.
//
// XXX it would be better for zLink to instead provide .Recv() to receive
// peer's requests and then serve is just loop over Recv and decide what to do
// with messages by zlink user, not here.
func (zl *zLink) Serve(
notifyTab map[string]func(interface{}) error,
serveTab map[string]func(context.Context, interface{}) interface{},
) error {
// initialize serve tabs and indicate that serve is ready
zl.serveTab = serveTab
zl.notifyTab = notifyTab
close(zl.serveReady)
// wait for zlink to become down and return shutdown error
zl.serveWg.Wait()
return zl.errDown
}
var errLinkClosed = errors.New("zlink is closed")
// shutdown shuts zlink down and sets reason of why the link was shut down.
......@@ -90,6 +125,7 @@ func (zl *zLink) shutdown(err error) {
log.Printf("%s: %s", zl.link.RemoteAddr(), err)
}
zl.errDown = err
zl.serveCancel()
// notify call waiters
zl.callMu.Lock()
......@@ -111,7 +147,7 @@ func (zl *zLink) Close() error {
// serveRecv handles receives from underlying link and dispatches them to calls
// waiting results.
// waiting for results, to notify and serve handlers.
func (zl *zLink) serveRecv() {
defer zl.serveWg.Done()
for {
......@@ -139,24 +175,67 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error {
return err
}
if m.method != ".reply" {
// TODO add notification channel (server calls client by itself")
return fmt.Errorf(".%d: method=%q; expected \".reply\"", m.msgid, m.method)
// message is reply
if m.method == ".reply" {
// lookup call by msgid and dispatch result to waiter
zl.callMu.Lock()
rxc := zl.callTab[m.msgid]
if rxc != nil {
delete(zl.callTab, m.msgid)
}
zl.callMu.Unlock()
if rxc == nil {
return fmt.Errorf(".%d: unexpected reply", m.msgid)
}
rxc <- m
return nil
}
// lookup call by msgid and dispatch result to waiter
zl.callMu.Lock()
rxc := zl.callTab[m.msgid]
if rxc != nil {
delete(zl.callTab, m.msgid)
// message is notification or call
// wait until user called Serve on us
<-zl.serveReady
// message is notification
if m.flags & msgAsync != 0 {
// notifications go in-order
f := zl.notifyTab[m.method]
if f == nil {
return fmt.Errorf(".%d: unknown notification %q", m.msgid, m.method)
}
err := f(m.arg)
if err != nil {
return fmt.Errorf(".%d: %s: %s", m.msgid, m.method, err)
}
return nil
}
zl.callMu.Unlock()
if rxc == nil {
return fmt.Errorf(".%d: unexpected reply", m.msgid)
// message is call
// calls are served in parallel
f := zl.serveTab[m.method]
if f == nil {
// disconnect on call to unknown method
err = fmt.Errorf("unknown method %q", m.method)
// TODO error -> exception
zl.reply(m.msgid, err) // ignore error
return fmt.Errorf(".%d: %s", m.msgid, err)
}
zl.serveWg.Add(1)
go func() {
defer zl.serveWg.Done()
res := f(zl.serveCtx, m.arg)
// TODO error -> exception
// send result back
err := zl.reply(m.msgid, res)
if err != nil {
zl.shutdown(err)
}
}()
rxc <- m
return nil
}
......@@ -209,6 +288,24 @@ func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (
return reply, nil
}
// reply sends reply to a call received with msgid.
func (zl *zLink) reply(msgid int64, res interface{}) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("%s: .%d reply: %s", zl.link.RemoteAddr(), msgid, err)
}
}()
pkb := zl.enc.pktEncode(msg{
msgid: msgid,
flags: msgAsync,
method: ".reply",
arg: res,
})
return zl.sendPkt(pkb)
}
// ---- raw IO ----
......@@ -364,7 +461,7 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) {
// create raw zlink since we need to do the handshake as ZEO message exchange,
// but don't start serve goroutines yet.
zl := &zLink{link: conn}
zl := &zLink{link: conn, serveReady: make(chan struct{})}
// ready when/if handshake tx/rx exchange succeeds
hok := make(chan struct{})
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment