Commit 00ace0e8 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb/zeo: Correctly prefix zLink-related errors with operational context

Go through zrpc and add error context to every function that can create
error. This is similar to what NEO does in neonet. The reason to do this
was the following obscure error from wcfs:

    E1105 11:32:33.295497   24639 wcfs.go:2505] zwatch zeo://:28359: EOF

which, after this patch, is now

    E1105 12:53:15.922052   30731 wcfs.go:2510] zwatch zeo://:27024: zlink 127.0.0.1:47768 - 127.0.0.1:27024: recvPkt: EOF
parent 4d781f60
...@@ -82,6 +82,13 @@ type zLink struct { ...@@ -82,6 +82,13 @@ type zLink struct {
enc encoding // protocol encoding in use ('Z' or 'M') enc encoding // protocol encoding in use ('Z' or 'M')
} }
// zLinkError is returned by zLink operations.
type zLinkError struct {
zlink *zLink
op string
err error
}
// (called after handshake) // (called after handshake)
func (zl *zLink) start() { func (zl *zLink) start() {
zl.callTab = make(map[int64]chan msg) zl.callTab = make(map[int64]chan msg)
...@@ -122,7 +129,7 @@ func (zl *zLink) shutdown(err error) { ...@@ -122,7 +129,7 @@ func (zl *zLink) shutdown(err error) {
err = err2 err = err2
} }
if err != nil { if err != nil {
log.Printf("%s: %s", zl.link.RemoteAddr(), err) log.Printf("%s", err)
} }
zl.errDown = err zl.errDown = err
zl.serveCancel() zl.serveCancel()
...@@ -168,7 +175,9 @@ func (zl *zLink) serveRecv() { ...@@ -168,7 +175,9 @@ func (zl *zLink) serveRecv() {
} }
// serveRecv1 handles 1 incoming packet. // serveRecv1 handles 1 incoming packet.
func (zl *zLink) serveRecv1(pkb *pktBuf) error { func (zl *zLink) serveRecv1(pkb *pktBuf) (err error) {
defer zl.errctx(&err, "serveRecv1")
// decode packet // decode packet
m, err := zl.enc.pktDecode(pkb) m, err := zl.enc.pktDecode(pkb)
if err != nil { if err != nil {
...@@ -241,11 +250,7 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error { ...@@ -241,11 +250,7 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error {
// Call makes 1 RPC call to server, waits for reply and returns it. // Call makes 1 RPC call to server, waits for reply and returns it.
func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (reply msg, err error) { func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (reply msg, err error) {
defer func() { defer zl.errctxf(&err, "call %s", method)
if err != nil {
err = fmt.Errorf("%s: call %s: %s", zl.link.RemoteAddr(), method, err)
}
}()
rxc := make(chan msg, 1) // reply will go here rxc := make(chan msg, 1) // reply will go here
...@@ -290,11 +295,7 @@ func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) ( ...@@ -290,11 +295,7 @@ func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (
// reply sends reply to a call received with msgid. // reply sends reply to a call received with msgid.
func (zl *zLink) reply(msgid int64, res interface{}) (err error) { func (zl *zLink) reply(msgid int64, res interface{}) (err error) {
defer func() { defer zl.errctxf(&err, ".%d reply", msgid)
if err != nil {
err = fmt.Errorf("%s: .%d reply: %s", zl.link.RemoteAddr(), msgid, err)
}
}()
pkb := zl.enc.pktEncode(msg{ pkb := zl.enc.pktEncode(msg{
msgid: msgid, msgid: msgid,
...@@ -366,9 +367,11 @@ const dumpio = false ...@@ -366,9 +367,11 @@ const dumpio = false
// sendPkt sends 1 raw ZEO packet. // sendPkt sends 1 raw ZEO packet.
// //
// pkb is freed upon return. // pkb is freed upon return.
func (zl *zLink) sendPkt(pkb *pktBuf) error { func (zl *zLink) sendPkt(pkb *pktBuf) (err error) {
defer zl.errctx(&err, "sendPkt")
pkb.Fixup() pkb.Fixup()
_, err := zl.link.Write(pkb.Bytes()) _, err = zl.link.Write(pkb.Bytes())
if dumpio { if dumpio {
fmt.Printf("%v > %v: %q\n", zl.link.LocalAddr(), zl.link.RemoteAddr(), pkb.Bytes()) fmt.Printf("%v > %v: %q\n", zl.link.LocalAddr(), zl.link.RemoteAddr(), pkb.Bytes())
} }
...@@ -380,7 +383,9 @@ func (zl *zLink) sendPkt(pkb *pktBuf) error { ...@@ -380,7 +383,9 @@ func (zl *zLink) sendPkt(pkb *pktBuf) error {
// //
// the packet returned contains both header and payload. // the packet returned contains both header and payload.
// XXX almost dup from NEO. // XXX almost dup from NEO.
func (zl *zLink) recvPkt() (*pktBuf, error) { func (zl *zLink) recvPkt() (_ *pktBuf, err error) {
defer zl.errctx(&err, "recvPkt")
pkb := allocPkb() pkb := allocPkb()
data := pkb.data[:cap(pkb.data)] data := pkb.data[:cap(pkb.data)]
...@@ -549,3 +554,38 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) { ...@@ -549,3 +554,38 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) {
zl.start() zl.start()
return zl, nil return zl, nil
} }
// ---- for convenience: String / Error / Cause ----
func (zlink *zLink) String() string {
return fmt.Sprintf("zlink %s - %s", zlink.link.LocalAddr(), zlink.link.RemoteAddr())
}
func (e *zLinkError) Error() string {
return fmt.Sprintf("%s: %s: %s", e.zlink, e.op, e.err)
}
func (e *zLinkError) Cause() error { return e.err }
func (e *zLinkError) Unwrap() error { return e.err }
func (zlink *zLink) err(op string, e error) error {
if e == nil {
return nil
}
return &zLinkError{zlink: zlink, op: op, err: e}
}
// errctx* are similar to xerr.Context* but create zLinkError instead of fmt.Errorf.
func (zlink *zLink) errctx(errp *error, op string) {
if *errp == nil {
return
}
*errp = zlink.err(op, *errp)
}
func (zlink *zLink) errctxf(errp *error, format string, argv ...interface{}) {
if *errp == nil {
return
}
*errp = zlink.err(fmt.Sprintf(format, argv...), *errp)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment