Commit 22a93966 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5bb2accb
...@@ -257,47 +257,8 @@ error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string ...@@ -257,47 +257,8 @@ error _WatchLink::replyReq(context::Context ctx, const PinReq *req, const string
} }
// _send sends raw message via specified stream.
//
// multiple _send can be called in parallel - _send serializes writes.
// msg must not include \n.
//
// XXX +ctx?
error _WatchLink::_send(StreamID stream, const string &msg) {
_WatchLink& wlink = *this;
xerr::Contextf E("%s: send .%d", v(wlink), stream);
if (msg.find('\n') != string::npos)
panic("msg has \\n");
string pkt = fmt::sprintf("%lu %s\n", stream, v(msg));
return E(wlink._write(pkt));
}
error _twlinkwrite(WatchLink wlink, const string &pkt) {
return wlink->_write(pkt);
}
error _WatchLink::_write(const string &pkt) {
_WatchLink& wlink = *this;
// no errctx
wlink._txmu.lock();
defer([&]() {
wlink._txmu.unlock();
});
//trace('C: watch : tx: \"%s\"', v(pkt));
int n;
error err;
tie(n, err) = wlink._f->write(pkt.c_str(), pkt.size());
return err;
}
// sendReq sends client -> server request and returns server reply. // sendReq sends client -> server request and returns server reply.
// XXX -> reply | None when EOF pair</*reply*/string, error> _WatchLink::sendReq(context::Context ctx, const string &req) {
pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req) {
_WatchLink& wlink = *this; _WatchLink& wlink = *this;
xerr::Contextf E("%s: sendReq", v(wlink)); // XXX + streamID xerr::Contextf E("%s: sendReq", v(wlink)); // XXX + streamID
...@@ -326,18 +287,6 @@ pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req) ...@@ -326,18 +287,6 @@ pair<string, error> _WatchLink::sendReq(context::Context ctx, const string &req)
return make_pair(reply, nil); return make_pair(reply, nil);
} }
// _nextReqID returns stream ID for next client-originating request to be made.
// XXX place=?
StreamID _WatchLink::_nextReqID() {
_WatchLink& wlink = *this;
wlink._txmu.lock(); // TODO ._req_next -> atomic (currently uses arbitrary lock)
StreamID stream = wlink._req_next;
wlink._req_next = (wlink._req_next + 2); // wraparound at uint64 max
wlink._txmu.unlock();
return stream;
}
tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, const string &req) { tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, const string &req) {
_WatchLink& wlink = *this; _WatchLink& wlink = *this;
// XXX errctx? // XXX errctx?
...@@ -372,6 +321,45 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons ...@@ -372,6 +321,45 @@ tuple</*rxq*/chan<rxPkt>, error> _WatchLink::_sendReq(context::Context ctx, cons
} }
// _send sends raw message via specified stream.
//
// multiple _send can be called in parallel - _send serializes writes.
// msg must not include \n.
//
// XXX +ctx?
error _WatchLink::_send(StreamID stream, const string &msg) {
_WatchLink& wlink = *this;
xerr::Contextf E("%s: send .%d", v(wlink), stream);
if (msg.find('\n') != string::npos)
panic("msg has \\n");
string pkt = fmt::sprintf("%lu %s\n", stream, v(msg));
return E(wlink._write(pkt));
}
error _twlinkwrite(WatchLink wlink, const string &pkt) {
return wlink->_write(pkt);
}
error _WatchLink::_write(const string &pkt) {
_WatchLink& wlink = *this;
// no errctx
wlink._txmu.lock();
defer([&]() {
wlink._txmu.unlock();
});
//trace('C: watch : tx: \"%s\"', v(pkt));
int n;
error err;
tie(n, err) = wlink._f->write(pkt.c_str(), pkt.size());
return err;
}
// _parsePinReq parses message into PinReq according to wcfs isolation protocol. // _parsePinReq parses message into PinReq according to wcfs isolation protocol.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt) { static error _parsePinReq(PinReq *pin, const rxPkt *pkt) {
pin->stream = pkt->stream; pin->stream = pkt->stream;
...@@ -504,4 +492,15 @@ int _WatchLink::fd() const { ...@@ -504,4 +492,15 @@ int _WatchLink::fd() const {
return wlink._f->fd(); return wlink._f->fd();
} }
// _nextReqID returns stream ID for next client-originating request to be made.
StreamID _WatchLink::_nextReqID() {
_WatchLink& wlink = *this;
wlink._txmu.lock(); // TODO ._req_next -> atomic (currently uses arbitrary lock)
StreamID stream = wlink._req_next;
wlink._req_next = (wlink._req_next + 2); // wraparound at uint64 max
wlink._txmu.unlock();
return stream;
}
} // wcfs:: } // wcfs::
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment