Commit f7d0d2da authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2eb9c369
...@@ -84,9 +84,10 @@ struct SrvReq; ...@@ -84,9 +84,10 @@ struct SrvReq;
struct WCFS { struct WCFS {
string mountpoint; string mountpoint;
Conn *connect(Tid at); tuple<Conn*, error> connect(Tid at);
string _path(const string &obj); string _path(const string &obj);
tuple<os::File, error> _open(const string &path, int mode=O_RDONLY); tuple<os::File, error> _open(const string &path, int mode=O_RDONLY);
tuple<WatchLink*, error> _openwatch();
}; };
// Conn represents logical connection that provides view of data on wcfs // Conn represents logical connection that provides view of data on wcfs
...@@ -138,6 +139,9 @@ struct _Mapping { ...@@ -138,6 +139,9 @@ struct _Mapping {
}; };
// StreamID stands for ID of a stream used over WatchLink.
typedef uint64_t StreamID;
// WatchLink represents /head/watch link opened on wcfs. // WatchLink represents /head/watch link opened on wcfs.
// //
// It is created by WCFS::_openwatch(). // It is created by WCFS::_openwatch().
...@@ -152,13 +156,13 @@ class WatchLink { ...@@ -152,13 +156,13 @@ class WatchLink {
chan<structZ> _rxeof; // becomes ready when wcfs closes its tx side chan<structZ> _rxeof; // becomes ready when wcfs closes its tx side
// inv.protocol message IO // inv.protocol message IO
chan<(stream,msg)> _acceptq; // server originated messages go here chan<(StreamID,msg)> _acceptq; // server originated messages go here
sync::Mutex _rxmu; sync::Mutex _rxmu;
dict<stream, chan<XXX>> _rxtab; // {} stream -> rxq server replies go via here dict<StreamID, chan<XXX>> _rxtab; // {} stream -> rxq server replies go via here
set<stream> _accepted; // streams we accepted but did not replied yet set<StreamID> _accepted; // streams we accepted but did not replied yet
XXXint _req_next = 1; // stream ID for next client-originated request XXX -> atomic StreamID _req_next = 1; // stream ID for next client-originated request XXX -> atomic
sync::Mutex _txmu // serializes writes sync::Mutex _txmu; // serializes writes
bool _txclosed; // XXX = False bool _txclosed; // XXX = False
#if 0 #if 0
...@@ -173,7 +177,7 @@ public: ...@@ -173,7 +177,7 @@ public:
// SrvReq represents 1 server-initiated wcfs request received over /head/watch link. // SrvReq represents 1 server-initiated wcfs request received over /head/watch link.
// XXX -> PinReq? // XXX -> PinReq?
struct SrvReq { struct SrvReq {
uint64_t stream // request was received with this stream ID StreamID stream; // request was received with this stream ID
Oid foid; // request is about this file Oid foid; // request is about this file
int64_t blk; // ----//---- about this block int64_t blk; // ----//---- about this block
Tid at; // pin to this at XXX ffff = None = unpin Tid at; // pin to this at XXX ffff = None = unpin
...@@ -181,16 +185,21 @@ struct SrvReq { ...@@ -181,16 +185,21 @@ struct SrvReq {
// connect creates new Conn viewing WCFS state as of @at. // connect creates new Conn viewing WCFS state as of @at.
Conn *WCFS::connect(Tid at) { tuple<Conn*, error> WCFS::connect(Tid at) {
WCFS *wc = this; WCFS *wc = this;
// XXX err ctx
WatchLink *wlink;
error err;
tie(wlink, err) = wc->_openwatch();
if (err != nil)
return make_tuple((Conn *)NULL, err);
Conn *wconn = new Conn(); Conn *wconn = new Conn();
// TODO support !isolated mode // TODO support !isolated mode
wconn->_wc = wc; wconn->_wc = wc;
wconn->at = at; wconn->at = at;
// wconn._wlink = WatchLink(wc) XXX wconn->_wlink = wlink;
tie(wcomm._wlink, err) = wc->_openwatch();
// XXX err
// XXX reenable // XXX reenable
...@@ -200,7 +209,7 @@ Conn *WCFS::connect(Tid at) { ...@@ -200,7 +209,7 @@ Conn *WCFS::connect(Tid at) {
wconn._pinWG.go(wconn._pinner) wconn._pinWG.go(wconn._pinner)
#endif #endif
return wconn; return make_tuple(wconn, nil);
} }
// XXX Conn::close // XXX Conn::close
...@@ -329,11 +338,13 @@ tuple<WatchLink*, error> WCFS::_openwatch() { ...@@ -329,11 +338,13 @@ tuple<WatchLink*, error> WCFS::_openwatch() {
WCFS *wc = this; WCFS *wc = this;
// head/watch handle. // head/watch handle.
os::File f;
error err;
tie(f, err) = wc->_open("head/watch", O_RDWR); tie(f, err) = wc->_open("head/watch", O_RDWR);
if (err != nil) if (err != nil)
return make_tuple(NULL, err); return make_tuple((WatchLink *)NULL, err);
wlink = new(WatchLink); WatchLink *wlink = new(WatchLink);
wlink->_wc = wc; wlink->_wc = wc;
wlink->_f = f; wlink->_f = f;
wlink->_rx_eof = makechan<structZ>(); wlink->_rx_eof = makechan<structZ>();
...@@ -346,6 +357,8 @@ tuple<WatchLink*, error> WCFS::_openwatch() { ...@@ -346,6 +357,8 @@ tuple<WatchLink*, error> WCFS::_openwatch() {
wlink->_serveWG = sync.WorkGroup(serveCtx) wlink->_serveWG = sync.WorkGroup(serveCtx)
wlink->_serveWG.go(wlink._serveRX) wlink->_serveWG.go(wlink._serveRX)
#endif #endif
return make_tuple(wlink, nil);
} }
// XXX close // XXX close
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment