Commit 7a731b3c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2f24a37d
...@@ -53,7 +53,7 @@ struct dict : std::unordered_map<Key, Value> { ...@@ -53,7 +53,7 @@ struct dict : std::unordered_map<Key, Value> {
// has returns whether container d (e.g. dict) has element k. // has returns whether container d (e.g. dict) has element k.
bool has(const Key &k) const { bool has(const Key &k) const {
const dict *d = this; const dict *d = this;
return d->find(k) != d.end(); return d->find(k) != d->end();
} }
// get implements `d[k] -> (v, ok)`. // get implements `d[k] -> (v, ok)`.
...@@ -62,7 +62,7 @@ struct dict : std::unordered_map<Key, Value> { ...@@ -62,7 +62,7 @@ struct dict : std::unordered_map<Key, Value> {
auto _ = d->find(k); auto _ = d->find(k);
if (_ == d->end()) if (_ == d->end())
return make_tuple(Value(), false); return make_tuple(Value(), false);
return make_tuple(_.second(), true); return make_tuple(_->second, true);
} }
}; };
...@@ -84,7 +84,8 @@ static error mmap_zero_into_ro(void *addr, size_t size); ...@@ -84,7 +84,8 @@ static error mmap_zero_into_ro(void *addr, size_t size);
// XXX ok? // XXX ok?
struct IContext { struct IContext {
virtual chan<structZ> done() = 0; virtual chan<structZ> done() = 0;
virtual error err() = 0;
}; };
struct Conn; struct Conn;
...@@ -203,6 +204,12 @@ class WatchLink { ...@@ -203,6 +204,12 @@ class WatchLink {
public: public:
friend tuple<WatchLink*, error> WCFS::_openwatch(); friend tuple<WatchLink*, error> WCFS::_openwatch();
SrvReq *recvReq(IContext *ctx); SrvReq *recvReq(IContext *ctx);
tuple<string, error> sendReq(IContext *ctx, const string &req);
private:
error _send(StreamID stream, const string &msg);
error _write(const string &pkt);
tuple<chan<rxPkt>, error> _sendReq(IContext *ctx, const string &req);
}; };
// SrvReq represents 1 server-initiated wcfs request received over /head/watch link. // SrvReq represents 1 server-initiated wcfs request received over /head/watch link.
...@@ -426,7 +433,7 @@ error WatchLink::_write(const string &pkt) { ...@@ -426,7 +433,7 @@ error WatchLink::_write(const string &pkt) {
wlink->_txmu.lock(); wlink->_txmu.lock();
defer([&]() { defer([&]() {
wlink->_txmu.unlock(); wlink->_txmu.unlock();
}) });
//printf('C: watch : tx: %r' % pkt) //printf('C: watch : tx: %r' % pkt)
int n; int n;
...@@ -448,15 +455,15 @@ tuple<string, error> WatchLink::sendReq(IContext *ctx, const string &req) { ...@@ -448,15 +455,15 @@ tuple<string, error> WatchLink::sendReq(IContext *ctx, const string &req) {
rxPkt rx; rxPkt rx;
chan<rxPkt> rxq; chan<rxPkt> rxq;
error err; error err;
tie(rxq, err) = wlink._sendReq(ctx, req); tie(rxq, err) = wlink->_sendReq(ctx, req);
// XXX err // XXX err
_ = select( int _ = select({
ctx.done().recvs(), // 0 ctx->done().recvs(), // 0
rxq.recvs(&rx), // 1 rxq.recvs(&rx), // 1
); });
if (_ == 0) if (_ == 0)
return make_tuple("", ctx.err()); return make_tuple("", ctx->err());
// XXX check for EOF // XXX check for EOF
string reply = rx.XXXtostring(); string reply = rx.XXXtostring();
...@@ -465,6 +472,7 @@ tuple<string, error> WatchLink::sendReq(IContext *ctx, const string &req) { ...@@ -465,6 +472,7 @@ tuple<string, error> WatchLink::sendReq(IContext *ctx, const string &req) {
tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string &req) { tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string &req) {
WatchLink *wlink = this; WatchLink *wlink = this;
// XXX err ctx?
wlink->_txmu.lock(); // XXX -> atomic (currently uses arbitrary lock) wlink->_txmu.lock(); // XXX -> atomic (currently uses arbitrary lock)
StreamID stream = wlink->_req_next; StreamID stream = wlink->_req_next;
...@@ -473,21 +481,21 @@ tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string ...@@ -473,21 +481,21 @@ tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string
auto rxq = makechan<rxPkt>(1); auto rxq = makechan<rxPkt>(1);
wlink->_rxmu.lock(); wlink->_rxmu.lock();
if (has(wlink->rxtab, stream)) { if (wlink->_rxtab.has(stream)) {
wlink->_rxmu.unlock(); wlink->_rxmu.unlock();
panic("BUG: to-be-sent stream is present in rxtab"); panic("BUG: to-be-sent stream is present in rxtab");
} }
wlink->_rxtab[stream] = rxq; wlink->_rxtab[stream] = rxq;
wlink->_rxmu.unlock(); wlink->_rxmu.unlock();
err = wlink->_send(stream, req); error err = wlink->_send(stream, req);
if (err != nil) { if (err != nil) {
// remove rxq from rxtab // remove rxq from rxtab
wlink->_rxmu.lock(); wlink->_rxmu.lock();
wlink->rxtab.erase(stream) wlink->_rxtab.erase(stream);
wlink->_rxmu.unlock(); wlink->_rxmu.unlock();
// no need to drain rxq - it was created with cap=1 // no need to drain rxq - it was created with cap=1
rxq = nil; rxq = nil;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment