Commit 1b236e16 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6ee3f42d
...@@ -67,14 +67,14 @@ pair<WatchLink, error> WCFS::_openwatch() { ...@@ -67,14 +67,14 @@ pair<WatchLink, error> WCFS::_openwatch() {
} }
error _WatchLink::closeWrite() { error _WatchLink::closeWrite() {
_WatchLink &wlink = *this; _WatchLink& wlink = *this;
xerr::Contextf E("%s: closeWrite", v(wlink)); xerr::Contextf E("%s: closeWrite", v(wlink));
wlink._txclose1.do_([&]() { wlink._txclose1.do_([&]() {
// ask wcfs to close its tx & rx sides; wcfs.close(tx) wakes up // ask wcfs to close its tx & rx sides; wcfs.close(tx) wakes up
// _serveRX on client (= on us). The connection can be already closed // _serveRX on client (= on us). The connection can be already closed
// by wcfs - so ignore errors when sending bye. // by wcfs - so ignore errors when sending bye.
(void)wlink._send(1, "bye"); // XXX stream ok? (void)wlink._send(wlink._nextReqID(), "bye");
// XXX vvv should be ~ shutdown(TX, wlink._f), however shutdown does // XXX vvv should be ~ shutdown(TX, wlink._f), however shutdown does
// not work for non-socket file descriptors. And even if we dup link // not work for non-socket file descriptors. And even if we dup link
...@@ -238,7 +238,7 @@ error _WatchLink::_write(const string &pkt) { ...@@ -238,7 +238,7 @@ error _WatchLink::_write(const string &pkt) {
wlink->_txmu.unlock(); wlink->_txmu.unlock();
}); });
//printf('C: watch : tx: %r' % pkt) //trace('C: watch : tx: %r' % pkt)
int n; int n;
error err; error err;
tie(n, err) = wlink->_f->write(pkt.c_str(), pkt.size()); tie(n, err) = wlink->_f->write(pkt.c_str(), pkt.size());
...@@ -276,14 +276,23 @@ pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req) ...@@ -276,14 +276,23 @@ pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req)
return make_pair(reply, nil); return make_pair(reply, nil);
} }
// _nextReqID returns stream ID for next client-originating request to be made.
// XXX place=?
StreamID _WatchLink::_nextReqID() {
_WatchLink& wlink = *this;
wlink._txmu.lock(); // TODO ._req_next -> atomic (currently uses arbitrary lock)
StreamID stream = wlink._req_next;
wlink._req_next = (wlink._req_next + 2); // wraparound at uint64 max
wlink._txmu.unlock();
return stream;
}
tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, const string &req) { tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, const string &req) {
_WatchLink *wlink = this; _WatchLink *wlink = this;
// XXX errctx? // XXX errctx?
wlink->_txmu.lock(); // TODO ._req_next -> atomic (currently uses arbitrary lock) StreamID stream = wlink->_nextReqID();
StreamID stream = wlink->_req_next;
wlink->_req_next = (wlink->_req_next + 2); // wraparound at uint64 max
wlink->_txmu.unlock();
auto rxq = makechan<rxPkt>(1); auto rxq = makechan<rxPkt>(1);
wlink->_rxmu.lock(); wlink->_rxmu.lock();
......
...@@ -118,6 +118,7 @@ private: ...@@ -118,6 +118,7 @@ private:
tuple<string, error> _readline(); tuple<string, error> _readline();
error _send(StreamID stream, const string &msg); error _send(StreamID stream, const string &msg);
error _write(const string &pkt); error _write(const string &pkt);
StreamID _nextReqID();
tuple<chan<rxPkt>, error> _sendReq(context::Context ctx, const string &req); tuple<chan<rxPkt>, error> _sendReq(context::Context ctx, const string &req);
friend error _twlinkwrite(WatchLink wlink, const string &pkt); friend error _twlinkwrite(WatchLink wlink, const string &pkt);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment