Commit 5c02054e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 350027d5
...@@ -36,6 +36,7 @@ using namespace golang; ...@@ -36,6 +36,7 @@ using namespace golang;
namespace io { namespace io {
const error EOF_ = fmt::errorf("EOF"); const error EOF_ = fmt::errorf("EOF");
const error ErrUnexpectedEOF = fmt::errorf("unexpected EOF");
} // io:: } // io::
......
...@@ -51,6 +51,7 @@ struct error { ...@@ -51,6 +51,7 @@ struct error {
namespace io { namespace io {
extern const error EOF_; extern const error EOF_;
extern const error ErrUnexpectedEOF;
} // io:: } // io::
......
...@@ -209,6 +209,7 @@ public: ...@@ -209,6 +209,7 @@ public:
private: private:
void _closeTX(); void _closeTX();
error _serveRX(IContext *ctx); error _serveRX(IContext *ctx);
tuple<string, error> _readline();
error _send(StreamID stream, const string &msg); error _send(StreamID stream, const string &msg);
error _write(const string &pkt); error _write(const string &pkt);
tuple<chan<rxPkt>, error> _sendReq(IContext *ctx, const string &req); tuple<chan<rxPkt>, error> _sendReq(IContext *ctx, const string &req);
...@@ -467,14 +468,19 @@ error WatchLink::_serveRX(IContext *ctx) { // XXX error -> where ? ...@@ -467,14 +468,19 @@ error WatchLink::_serveRX(IContext *ctx) { // XXX error -> where ?
} }
}); });
string l;
error err;
while (1) { while (1) {
// NOTE: .close() makes sure .f.read*() will wake up // NOTE: .close() makes sure .f.read*() will wake up
l = wlink._f.readline() tie(l, err) = wlink._readline();
printf("C: watch : rx: %r" % l); if (err == io::EOF_) { // peer closed its tx
if (len(l) == 0) { // peer closed its tx // XXX what happens with other errors?
wlink._rx_eof.close(); wlink._rx_eof.close();
break;
} }
if (err != nil)
return err;
printf("C: watch : rx: %s", l.c_str());
// <stream> ... \n // <stream> ... \n
stream, msg = l.split(' ', 1) stream, msg = l.split(' ', 1)
...@@ -618,6 +624,35 @@ tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string ...@@ -618,6 +624,35 @@ tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string
return make_tuple(rxq, err); return make_tuple(rxq, err);
} }
// _readline reads next raw line sent from wcfs.
tuple<string, error> WatchLink::_readline() {
WatchLink& wlink = *this;
char buf[128];
pos = 0; // XXX naming
while (1) {
nl = wlink._rxbuf.find('\n', pos);
if (nl != string::npos) {
line = string(wlink._rxbuf, 0, nl+1);
wlink._rxbuf = string(wlink._rxbuf, nl+1);
return make_tuple(line, nil);
}
pos = wlink._rxbuf.length();
tie(n, err) = wlink._f.read(buf, sizeof(buf));
if (n > 0) {
// XXX limit line length to avoid DoS
wlink._rxbuf += string(buf, n);
continue;
}
if (err == nil)
panic("read returned (0, nil)");
if (err == io::EOF_ && wlink._rxbuf.length() != 0)
err = io::ErrUnexpectedEOF;
return make_tuple("", err);
}
}
// ---- WCFS raw file access ---- // ---- WCFS raw file access ----
// _path returns path for object on wcfs. // _path returns path for object on wcfs.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment