Commit 9c3e0f4b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 538ac6f8
......@@ -149,6 +149,17 @@ struct dict : std::unordered_map<Key, Value> {
return make_tuple(Value(), false);
return make_tuple(_->second, true);
}
// pop implements `d[k] -> (v, ok); del d[k]`.
tuple<Value, bool> pop(const Key &k) {
dict &d = *this;
auto _ = d.find(k);
if (_ == d.end())
return make_tuple(Value(), false);
Value v = _->second;
d.erase(_);
return make_tuple(v, true);
}
};
// set wraps unordered_set into ergonomic interface.
......
......@@ -186,6 +186,7 @@ class WatchLink {
// inv.protocol message IO
chan<rxPkt> _acceptq; // server originated messages go here
sync::Mutex _rxmu;
bool _rxdown;
dict<StreamID, chan<rxPkt>>
_rxtab; // {} stream -> rxq server replies go via here
set<StreamID> _accepted; // streams we accepted but did not replied yet
......@@ -207,7 +208,7 @@ public:
private:
void _closeTX();
void _serveRX(IContext *ctx);
error _serveRX(IContext *ctx);
error _send(StreamID stream, const string &msg);
error _write(const string &pkt);
tuple<chan<rxPkt>, error> _sendReq(IContext *ctx, const string &req);
......@@ -394,6 +395,7 @@ tuple<WatchLink*, error> WCFS::_openwatch() {
wlink->_f = f;
wlink->_rx_eof = makechan<structZ>();
wlink->_acceptq = makechan<rxPkt>();
wlink->_rxdown = false;
wlink->_req_next = 1;
#if 0
......@@ -450,25 +452,27 @@ error WatchLink::close() {
}
// _serveRX receives messages from ._f and dispatches them according to streamID.
void WatchLink::_serveRX(IContext *ctx) { // XXX void -> ?
error WatchLink::_serveRX(IContext *ctx) { // XXX error -> where ?
WatchLink& wlink = *this;
// when finishing - wakeup everyone waiting for rx
defer([&]() {
wlink._acceptq.close();
with wlink._rxmu:
rxtab = wlink._rxtab
wlink._rxtab = None // don't allow new rxtab registers
for rxq in rxtab.values():
wlink._rxmu.lock();
wlink._rxdown = true; // don't allow new rxtab registers
wlink._rxmu.unlock();
for (auto rxqi : wlink._rxtab) {
auto rxq = rxqi.second;
rxq.close();
}
});
while (1) {
// NOTE: .close() makes sure .f.read*() will wake up
l = wlink.f.readline()
l = wlink._f.readline()
printf("C: watch : rx: %r" % l);
if (len(l) == 0) { // peer closed its tx
wlink.rx_eof.close();
wlink._rx_eof.close();
break;
}
......@@ -486,26 +490,39 @@ void WatchLink::_serveRX(IContext *ctx) { // XXX void -> ?
bool reply = (stream % 2 != 0);
if (reply) {
with wlink._rxmu:
assert stream in wlink._rxtab // XXX !test assert - recheck
rxq = wlink._rxtab.pop(stream)
_, _rx = select(
ctx.done().recv, // 0
(rxq.send, msg), // 1
)
if _ == 0:
raise ctx.err()
chan<rxPkt> rxq;
bool ok;
wlink._rxmu.lock();
tie(rxq, ok) = wlink._rxtab.pop(stream);
wlink._rxmu.unlock();
if (!ok) {
// wcfs sent reply on unexpected stream
// XXX log + dowmn.
continue;
}
int _ = select({
ctx->done().recvs(), // 0
rxq.sends(msg), // 1
});
if (_ == 0)
return ctx->err();
}
else {
with wlink._rxmu:
assert stream not in wlink._accepted // XXX !test assert - recheck
wlink._accepted.add(stream)
_, _rx = select(
ctx.done().recv, // 0
(wlink._acceptq.send, (stream, msg)), // 1
)
if _ == 0:
raise ctx.err()
wlink._rxmu.lock();
if (wlink._accepted.has(stream)) {
wlink._rxmu.unlock();
// XXX log + down
continue;
}
wlink._accepted.insert(stream);
wlink._rxmu.unlock();
int _ = select({
ctx->done().recvs(), // 0
wlink._acceptq.sends((stream, msg)), // 1
});
if (_ == 0)
return ctx->err();
}
}
}
......@@ -573,6 +590,10 @@ tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string
auto rxq = makechan<rxPkt>(1);
wlink->_rxmu.lock();
if (wlink->_rxdown) {
wlink->_rxmu.unlock();
return EClosed;
}
if (wlink->_rxtab.has(stream)) {
wlink->_rxmu.unlock();
panic("BUG: to-be-sent stream is present in rxtab");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment