Commit e9678fad authored by Kirill Smelkov's avatar Kirill Smelkov

X WatchLink -> refcounted class

parent e6ce4ddd
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
# XXX doc # XXX doc
from golang cimport error from golang cimport error, refptr
from golang cimport context from golang cimport context
from libcpp cimport nullptr_t, nullptr as nil from libcpp cimport nullptr_t, nullptr as nil
...@@ -33,18 +33,24 @@ cdef extern from *: ...@@ -33,18 +33,24 @@ cdef extern from *:
ctypedef bint cbool "bool" ctypedef bint cbool "bool"
cdef extern from "wcfs_watchlink.h" nogil: cdef extern from "wcfs_watchlink.h" nogil:
cppclass WatchLink: cppclass _WatchLink:
error close() error close()
#error recvReq(ctx, PinReq *rx_into) #error recvReq(ctx, PinReq *rx_into)
pair[string, error] sendReq(context.Context ctx, const string &req) pair[string, error] sendReq(context.Context ctx, const string &req)
ctypedef WatchLink *pWatchLink # https://github.com/cython/cython/issues/534 cppclass WatchLink (refptr[_WatchLink]):
# WatchLink.X = WatchLink->X in C++
error close "_ptr()->close" ()
pair[string, error] sendReq "_ptr()->sendReq" (context.Context ctx, const string &req)
#ctypedef WatchLink *pWatchLink # https://github.com/cython/cython/issues/534
cdef extern from "wcfs.h" nogil: cdef extern from "wcfs.h" nogil:
cppclass WCFS: cppclass WCFS:
string mountpoint string mountpoint
pair[pWatchLink, error] _openwatch() # XXX pair instead of tuple #pair[pWatchLink, error] _openwatch() # XXX pair instead of tuple
pair[WatchLink, error] _openwatch() # XXX pair instead of tuple
cdef class PyWCFS: cdef class PyWCFS:
...@@ -57,7 +63,7 @@ cdef class PyWCFS: ...@@ -57,7 +63,7 @@ cdef class PyWCFS:
pywc.wc.mountpoint = v pywc.wc.mountpoint = v
cdef class PyWatchLink: cdef class PyWatchLink:
cdef WatchLink *wlink cdef WatchLink wlink
def __init__(PyWatchLink pywlink, PyWCFS pywc): def __init__(PyWatchLink pywlink, PyWCFS pywc):
with nogil: with nogil:
...@@ -68,7 +74,9 @@ cdef class PyWatchLink: ...@@ -68,7 +74,9 @@ cdef class PyWatchLink:
if err != nil: if err != nil:
raise RuntimeError(err.Error()) # XXX exc class? raise RuntimeError(err.Error()) # XXX exc class?
# XXX __dealloc__ def __dealloc__(PyWatchLink pywlink):
pywlink.wlink = nil
def close(PyWatchLink pywlink): def close(PyWatchLink pywlink):
with nogil: with nogil:
...@@ -103,11 +111,14 @@ from golang cimport topyexc ...@@ -103,11 +111,14 @@ from golang cimport topyexc
cdef nogil: cdef nogil:
pair[pWatchLink, error] wcfs_openwatch_pyexc(WCFS *wcfs) except +topyexc: #pair[pWatchLink, error] wcfs_openwatch_pyexc(WCFS *wcfs) except +topyexc:
pair[WatchLink, error] wcfs_openwatch_pyexc(WCFS *wcfs) except +topyexc:
return wcfs._openwatch() return wcfs._openwatch()
error wlink_close_pyexc(WatchLink *wlink) except +topyexc: #error wlink_close_pyexc(WatchLink *wlink) except +topyexc:
error wlink_close_pyexc(WatchLink wlink) except +topyexc:
return wlink.close() return wlink.close()
pair[string, error] wlink_sendReq_pyexc(WatchLink *wlink, context.Context ctx, const string &req) except +topyexc: #pair[string, error] wlink_sendReq_pyexc(WatchLink *wlink, context.Context ctx, const string &req) except +topyexc:
pair[string, error] wlink_sendReq_pyexc(WatchLink wlink, context.Context ctx, const string &req) except +topyexc:
return wlink.sendReq(ctx, req) return wlink.sendReq(ctx, req)
...@@ -34,7 +34,7 @@ using std::pair; ...@@ -34,7 +34,7 @@ using std::pair;
#include "wcfs_misc.h" #include "wcfs_misc.h"
struct Conn; struct Conn;
struct WatchLink; struct _WatchLink;
// WCFS represents filesystem-level connection to wcfs server. // WCFS represents filesystem-level connection to wcfs server.
...@@ -42,10 +42,10 @@ struct WatchLink; ...@@ -42,10 +42,10 @@ struct WatchLink;
struct WCFS { struct WCFS {
string mountpoint; string mountpoint;
tuple<Conn*, error> connect(zodb::Tid at); tuple<Conn*, error> connect(zodb::Tid at);
string _path(const string &obj); string _path(const string &obj);
tuple<os::File, error> _open(const string &path, int flags=O_RDONLY); tuple<os::File, error> _open(const string &path, int flags=O_RDONLY);
pair<WatchLink*, error> _openwatch(); pair<refptr<_WatchLink>, error> _openwatch();
}; };
......
...@@ -51,7 +51,6 @@ static error mmap_into_ro(void *addr, size_t size, const os::File &f, off_t offs ...@@ -51,7 +51,6 @@ static error mmap_into_ro(void *addr, size_t size, const os::File &f, off_t offs
struct Conn; struct Conn;
struct _File; struct _File;
struct _Mapping; struct _Mapping;
struct WatchLink;
struct PinReq; struct PinReq;
// Conn represents logical connection that provides view of data on wcfs // Conn represents logical connection that provides view of data on wcfs
...@@ -61,7 +60,7 @@ struct PinReq; ...@@ -61,7 +60,7 @@ struct PinReq;
struct Conn { struct Conn {
WCFS *_wc; WCFS *_wc;
zodb::Tid at; zodb::Tid at;
WatchLink *_wlink; WatchLink _wlink;
sync::Mutex _filemu; sync::Mutex _filemu;
dict<zodb::Oid, _File*> _filetab; // {} foid -> _file dict<zodb::Oid, _File*> _filetab; // {} foid -> _file
...@@ -113,7 +112,7 @@ tuple<Conn*, error> WCFS::connect(zodb::Tid at) { ...@@ -113,7 +112,7 @@ tuple<Conn*, error> WCFS::connect(zodb::Tid at) {
WCFS *wc = this; WCFS *wc = this;
// XXX err ctx // XXX err ctx
WatchLink *wlink; WatchLink wlink;
error err; error err;
tie(wlink, err) = wc->_openwatch(); tie(wlink, err) = wc->_openwatch();
if (err != nil) if (err != nil)
......
...@@ -23,9 +23,15 @@ ...@@ -23,9 +23,15 @@
#include <string.h> #include <string.h>
_WatchLink::_WatchLink() {}
_WatchLink::~_WatchLink() {}
void _WatchLink::decref() {
if (__decref())
delete this;
}
// _openwatch opens new watch link on wcfs. // _openwatch opens new watch link on wcfs.
pair<WatchLink*, error> WCFS::_openwatch() { pair<WatchLink, error> WCFS::_openwatch() {
WCFS *wc = this; WCFS *wc = this;
// head/watch handle. // head/watch handle.
...@@ -33,9 +39,9 @@ pair<WatchLink*, error> WCFS::_openwatch() { ...@@ -33,9 +39,9 @@ pair<WatchLink*, error> WCFS::_openwatch() {
error err; error err;
tie(f, err) = wc->_open("head/watch", O_RDWR); tie(f, err) = wc->_open("head/watch", O_RDWR);
if (err != nil) if (err != nil)
return make_pair((WatchLink *)NULL, err); return make_pair(nil, err);
WatchLink *wlink = new(WatchLink); WatchLink wlink = adoptref(new(_WatchLink));
wlink->_wc = wc; wlink->_wc = wc;
wlink->_f = f; wlink->_f = f;
wlink->_rx_eof = makechan<structZ>(); wlink->_rx_eof = makechan<structZ>();
...@@ -52,8 +58,8 @@ pair<WatchLink*, error> WCFS::_openwatch() { ...@@ -52,8 +58,8 @@ pair<WatchLink*, error> WCFS::_openwatch() {
return make_pair(wlink, nil); return make_pair(wlink, nil);
} }
void WatchLink::_closeTX() { void _WatchLink::_closeTX() {
WatchLink &wlink = *this; _WatchLink &wlink = *this;
wlink._txclose1.do_([&]() { wlink._txclose1.do_([&]() {
// ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up // ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
// _serveRX on client (= on us). The connection can be already closed // _serveRX on client (= on us). The connection can be already closed
...@@ -69,8 +75,8 @@ void WatchLink::_closeTX() { ...@@ -69,8 +75,8 @@ void WatchLink::_closeTX() {
} }
// close closes the link. // close closes the link.
error WatchLink::close() { error _WatchLink::close() {
WatchLink& wlink = *this; _WatchLink& wlink = *this;
wlink._closeTX(); wlink._closeTX();
#if 0 #if 0
...@@ -97,8 +103,8 @@ error WatchLink::close() { ...@@ -97,8 +103,8 @@ error WatchLink::close() {
} }
// _serveRX receives messages from ._f and dispatches them according to streamID. // _serveRX receives messages from ._f and dispatches them according to streamID.
error WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ? error _WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
WatchLink& wlink = *this; _WatchLink& wlink = *this;
// when finishing - wakeup everyone waiting for rx // when finishing - wakeup everyone waiting for rx
defer([&]() { defer([&]() {
...@@ -184,16 +190,16 @@ error WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ? ...@@ -184,16 +190,16 @@ error WatchLink::_serveRX(context::Context ctx) { // XXX error -> where ?
// //
// multiple _send can be called in parallel - _send serializes writes. // multiple _send can be called in parallel - _send serializes writes.
// XXX +ctx? // XXX +ctx?
error WatchLink::_send(StreamID stream, const string &msg) { error _WatchLink::_send(StreamID stream, const string &msg) {
WatchLink *wlink = this; _WatchLink *wlink = this;
if (msg.find('\n') != string::npos) if (msg.find('\n') != string::npos)
panic("msg has \\n"); panic("msg has \\n");
string pkt = fmt::sprintf("%lu %s\n", stream, msg.c_str()); string pkt = fmt::sprintf("%lu %s\n", stream, msg.c_str());
return wlink->_write(pkt); return wlink->_write(pkt);
} }
error WatchLink::_write(const string &pkt) { error _WatchLink::_write(const string &pkt) {
WatchLink *wlink = this; _WatchLink *wlink = this;
wlink->_txmu.lock(); wlink->_txmu.lock();
defer([&]() { defer([&]() {
...@@ -209,8 +215,8 @@ error WatchLink::_write(const string &pkt) { ...@@ -209,8 +215,8 @@ error WatchLink::_write(const string &pkt) {
// sendReq sends client -> server request and returns server reply. // sendReq sends client -> server request and returns server reply.
// XXX -> reply | None when EOF // XXX -> reply | None when EOF
pair<string, error> WatchLink::sendReq(context::Context ctx, const string &req) { pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req) {
WatchLink *wlink = this; _WatchLink *wlink = this;
// XXX err ctx // XXX err ctx
rxPkt rx; rxPkt rx;
...@@ -231,8 +237,8 @@ pair<string, error> WatchLink::sendReq(context::Context ctx, const string &req) ...@@ -231,8 +237,8 @@ pair<string, error> WatchLink::sendReq(context::Context ctx, const string &req)
return make_pair(reply, nil); return make_pair(reply, nil);
} }
tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(context::Context ctx, const string &req) { tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, const string &req) {
WatchLink *wlink = this; _WatchLink *wlink = this;
// XXX err ctx? // XXX err ctx?
wlink->_txmu.lock(); // XXX -> atomic (currently uses arbitrary lock) wlink->_txmu.lock(); // XXX -> atomic (currently uses arbitrary lock)
...@@ -269,8 +275,8 @@ tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(context::Context ctx, const ...@@ -269,8 +275,8 @@ tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(context::Context ctx, const
// recvReq receives client <- server request. // recvReq receives client <- server request.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt); static error _parsePinReq(PinReq *pin, const rxPkt *pkt);
error WatchLink::recvReq(context::Context ctx, PinReq *prx) { error _WatchLink::recvReq(context::Context ctx, PinReq *prx) {
WatchLink& wlink = *this; _WatchLink& wlink = *this;
// XXX err ctx? // XXX err ctx?
rxPkt pkt; rxPkt pkt;
...@@ -329,8 +335,8 @@ static error _parsePinReq(PinReq *pin, const rxPkt *pkt) { ...@@ -329,8 +335,8 @@ static error _parsePinReq(PinReq *pin, const rxPkt *pkt) {
} }
// _readline reads next raw line sent from wcfs. // _readline reads next raw line sent from wcfs.
tuple<string, error> WatchLink::_readline() { tuple<string, error> _WatchLink::_readline() {
WatchLink& wlink = *this; _WatchLink& wlink = *this;
char buf[128]; char buf[128];
size_t nl_searchfrom = 0; size_t nl_searchfrom = 0;
......
...@@ -66,7 +66,9 @@ static_assert(sizeof(rxPkt) == 128); ...@@ -66,7 +66,9 @@ static_assert(sizeof(rxPkt) == 128);
// .close() closes the link. // .close() closes the link.
// //
// It is safe to use WatchLink from multiple threads simultaneously. // It is safe to use WatchLink from multiple threads simultaneously.
class WatchLink { class _WatchLink;
typedef refptr<_WatchLink> WatchLink;
class _WatchLink : public object {
WCFS *_wc; WCFS *_wc;
os::File _f; // head/watch file handle os::File _f; // head/watch file handle
string _rxbuf; // buffer for read data from _f string _rxbuf; // buffer for read data from _f
...@@ -89,8 +91,15 @@ class WatchLink { ...@@ -89,8 +91,15 @@ class WatchLink {
sync.WorkGroup *_serveWG sync.WorkGroup *_serveWG
#endif #endif
// don't new - create only via WCFS._openwatch()
private:
_WatchLink();
~_WatchLink();
friend pair<WatchLink, error> WCFS::_openwatch();
public:
void decref();
public: public:
friend pair<WatchLink*, error> WCFS::_openwatch();
error close(); error close();
error recvReq(context::Context ctx, PinReq *rx_into); error recvReq(context::Context ctx, PinReq *rx_into);
pair<string, error> sendReq(context::Context ctx, const string &req); pair<string, error> sendReq(context::Context ctx, const string &req);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment