Commit c2e56e80 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 1301bbc7
...@@ -39,11 +39,21 @@ struct error { ...@@ -39,11 +39,21 @@ struct error {
error() {} error() {}
error(nullptr_t) {} // = nil error(nullptr_t) {} // = nil
bool operator==(nullptr_t) { // == nil
// == nil
bool operator==(nullptr_t) const {
return err.empty(); return err.empty();
} }
bool operator!=(nullptr_t) { // != nil bool operator!=(nullptr_t) const {
return !(this==NULL); return !(*this==NULL);
}
// == error
bool operator==(const error &rhs) const {
return (err == rhs.err);
}
bool operator!=(const error &rhs) const {
return !(*this==rhs);
} }
}; };
......
...@@ -34,6 +34,7 @@ using namespace golang; ...@@ -34,6 +34,7 @@ using namespace golang;
#include <wendelin/bigfile/ram.h> #include <wendelin/bigfile/ram.h>
#include <wendelin/bug.h> #include <wendelin/bug.h>
#include <string>
#include <vector> #include <vector>
#include <sys/types.h> #include <sys/types.h>
...@@ -165,10 +166,13 @@ struct rxPkt { ...@@ -165,10 +166,13 @@ struct rxPkt {
// XXX not e.g. string as chan<T> currently does not support types with // XXX not e.g. string as chan<T> currently does not support types with
// non-trivial copy. Note: we anyway need to limit line length to avoid DoS // non-trivial copy. Note: we anyway need to limit line length to avoid DoS
// but just for DoS the limit would be higher. // but just for DoS the limit would be higher.
char data[128-sizeof(StreamID)]; uint16_t datalen;
char data[128 - sizeof(StreamID) - sizeof(uint16_t)];
error from_string(const string& rx);
string to_string() const; string to_string() const;
}; };
static_assert(sizeof(rxPkt) == 128);
// WatchLink represents /head/watch link opened on wcfs. // WatchLink represents /head/watch link opened on wcfs.
...@@ -182,6 +186,7 @@ struct rxPkt { ...@@ -182,6 +186,7 @@ struct rxPkt {
class WatchLink { class WatchLink {
WCFS *_wc; WCFS *_wc;
os::File _f; // head/watch file handle os::File _f; // head/watch file handle
string _rxbuf; // buffer for read data from _f
chan<structZ> _rx_eof; // becomes ready when wcfs closes its tx side chan<structZ> _rx_eof; // becomes ready when wcfs closes its tx side
// inv.protocol message IO // inv.protocol message IO
...@@ -475,7 +480,7 @@ error WatchLink::_serveRX(IContext *ctx) { // XXX error -> where ? ...@@ -475,7 +480,7 @@ error WatchLink::_serveRX(IContext *ctx) { // XXX error -> where ?
while (1) { while (1) {
// NOTE: .close() makes sure .f.read*() will wake up // NOTE: .close() makes sure .f.read*() will wake up
tie(l, err) = wlink._readline(); tie(l, err) = wlink._readline(); // XXX +maxlen
if (err == io::EOF_) { // peer closed its tx if (err == io::EOF_) { // peer closed its tx
// XXX what happens on other errors? // XXX what happens on other errors?
wlink._rx_eof.close(); wlink._rx_eof.close();
...@@ -484,43 +489,24 @@ error WatchLink::_serveRX(IContext *ctx) { // XXX error -> where ? ...@@ -484,43 +489,24 @@ error WatchLink::_serveRX(IContext *ctx) { // XXX error -> where ?
return err; return err;
printf("C: watch : rx: %s", l.c_str()); printf("C: watch : rx: %s", l.c_str());
// <stream> ... \n err = pkt.from_string(l);
sp = l.find(' '); if (err != nil)
if (sp == string::npos) { return err;
// XXX invalid pkt: no sp
return fmt.errorf("invalid pkt: no SP");
}
sid = string(l, 0, sp);
smsg = string(l, sp+1);
sscanf(sid.c_str(), "%" SCNu64, &pkt.streamID);
snprintf(...., "%" PRIu64, pkt.streamID);
if (string(...) != sid)
return fmt.errorf("invalid pkt: invalid stream ID");
// XXX len(data) > pkt.data?
// XXX copy data -> pkt.data
stream, msg = l.split(' ', 1)
stream = int(stream)
msg = msg.rstrip('\n')
if (stream == 0) { // control/fatal message from wcfs if (pkt.stream == 0) { // control/fatal message from wcfs
// XXX print -> receive somewhere? XXX -> recvCtl ? // XXX print -> receive somewhere? XXX -> recvCtl ?
printf("C: watch : rx fatal: %r\n" % msg); printf("C: watch : rx fatal: %s\n", l.c_str());
wlink.fatalv.append(msg); //wlink.fatalv.append(msg);
continue; continue;
} }
bool reply = (stream % 2 != 0); bool reply = (pkt.stream % 2 != 0);
if (reply) { if (reply) {
chan<rxPkt> rxq; chan<rxPkt> rxq;
bool ok; bool ok;
wlink._rxmu.lock(); wlink._rxmu.lock();
tie(rxq, ok) = wlink._rxtab.pop(stream); tie(rxq, ok) = wlink._rxtab.pop(pkt.stream);
wlink._rxmu.unlock(); wlink._rxmu.unlock();
if (!ok) { if (!ok) {
// wcfs sent reply on unexpected stream // wcfs sent reply on unexpected stream
...@@ -530,25 +516,25 @@ error WatchLink::_serveRX(IContext *ctx) { // XXX error -> where ? ...@@ -530,25 +516,25 @@ error WatchLink::_serveRX(IContext *ctx) { // XXX error -> where ?
} }
int _ = select({ int _ = select({
ctx->done().recvs(), // 0 ctx->done().recvs(), // 0
rxq.sends(msg), // 1 rxq.sends(&pkt), // 1
}); });
if (_ == 0) if (_ == 0)
return ctx->err(); return ctx->err();
} }
else { else {
wlink._rxmu.lock(); wlink._rxmu.lock();
if (wlink._accepted.has(stream)) { if (wlink._accepted.has(pkt.stream)) {
wlink._rxmu.unlock(); wlink._rxmu.unlock();
// XXX log + down // XXX log + down
printf("wcfs sent request on already used stream\n"); printf("wcfs sent request on already used stream\n");
continue; continue;
} }
// XXX clear _accepted not to leak memory after reply is sent? // XXX clear _accepted not to leak memory after reply is sent?
wlink._accepted.insert(stream); wlink._accepted.insert(pkt.stream);
wlink._rxmu.unlock(); wlink._rxmu.unlock();
int _ = select({ int _ = select({
ctx->done().recvs(), // 0 ctx->done().recvs(), // 0
wlink._acceptq.sends((stream, msg)), // 1 wlink._acceptq.sends(&pkt), // 1
}); });
if (_ == 0) if (_ == 0)
return ctx->err(); return ctx->err();
...@@ -621,7 +607,7 @@ tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string ...@@ -621,7 +607,7 @@ tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string
wlink->_rxmu.lock(); wlink->_rxmu.lock();
if (wlink->_rxdown) { if (wlink->_rxdown) {
wlink->_rxmu.unlock(); wlink->_rxmu.unlock();
return EClosed; return make_tuple(nil, fmt::errorf("link is down"));
} }
if (wlink->_rxtab.has(stream)) { if (wlink->_rxtab.has(stream)) {
wlink->_rxmu.unlock(); wlink->_rxmu.unlock();
...@@ -649,16 +635,18 @@ tuple<string, error> WatchLink::_readline() { ...@@ -649,16 +635,18 @@ tuple<string, error> WatchLink::_readline() {
WatchLink& wlink = *this; WatchLink& wlink = *this;
char buf[128]; char buf[128];
pos = 0; // XXX naming size_t nl_searchfrom = 0;
while (1) { while (1) {
nl = wlink._rxbuf.find('\n', pos); auto nl = wlink._rxbuf.find('\n', nl_searchfrom);
if (nl != string::npos) { if (nl != string::npos) {
line = string(wlink._rxbuf, 0, nl+1); auto line = string(wlink._rxbuf, 0, nl+1);
wlink._rxbuf = string(wlink._rxbuf, nl+1); wlink._rxbuf = string(wlink._rxbuf, nl+1);
return make_tuple(line, nil); return make_tuple(line, nil);
} }
pos = wlink._rxbuf.length(); nl_searchfrom = wlink._rxbuf.length();
int n;
error err;
tie(n, err) = wlink._f.read(buf, sizeof(buf)); tie(n, err) = wlink._f.read(buf, sizeof(buf));
if (n > 0) { if (n > 0) {
// XXX limit line length to avoid DoS // XXX limit line length to avoid DoS
...@@ -673,6 +661,36 @@ tuple<string, error> WatchLink::_readline() { ...@@ -673,6 +661,36 @@ tuple<string, error> WatchLink::_readline() {
} }
} }
// from_string parses string into rxPkt.
error rxPkt::from_string(const string &rx) {
rxPkt& pkt = *this;
// <stream> ... \n
auto sp = rx.find(' ');
if (sp == string::npos)
return fmt::errorf("invalid pkt: no SP");
string sid = string(rx, 0, sp);
string smsg = string(rx, sp+1);
sscanf(sid.c_str(), "%" SCNu64, &pkt.stream);
if (std::to_string(pkt.stream) != sid)
return fmt::errorf("invalid pkt: invalid stream ID");
auto msglen = smsg.length();
if (msglen > ARRAY_SIZE(pkt.data))
return fmt::errorf("invalid pkt: len(msg) > %zu", ARRAY_SIZE(pkt.data));
memcpy(pkt.data, smsg.c_str(), msglen);
pkt.datalen = msglen;
return nil;
}
// to_string converts rxPkt data into string.
string rxPkt::to_string() const {
const rxPkt& pkt = *this;
return string(pkt.data, pkt.datalen);
}
// ---- WCFS raw file access ---- // ---- WCFS raw file access ----
// _path returns path for object on wcfs. // _path returns path for object on wcfs.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment