Commit d34c4610 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 09310101
...@@ -235,6 +235,7 @@ setup( ...@@ -235,6 +235,7 @@ setup(
PyGoExt('wcfs.internal._wcfs', PyGoExt('wcfs.internal._wcfs',
['wcfs/internal/_wcfs.pyx', ['wcfs/internal/_wcfs.pyx',
'wcfs/internal/wcfs_virtmem.cpp', 'wcfs/internal/wcfs_virtmem.cpp',
'wcfs/internal/wcfs_watchlink.cpp',
'wcfs/internal/wcfs_misc.cpp', 'wcfs/internal/wcfs_misc.cpp',
], ],
include_dirs = [ # XXX -> common place include_dirs = [ # XXX -> common place
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#define _NXD_WCFS_MISC_H_ #define _NXD_WCFS_MISC_H_
#include <stddef.h> #include <stddef.h>
#include <stdint.h>
#include <string> #include <string>
using std::string; using std::string;
...@@ -152,6 +153,59 @@ vector<string> split(const string &s, char sep); ...@@ -152,6 +153,59 @@ vector<string> split(const string &s, char sep);
} // strings:: } // strings::
// XXX ok?
struct IContext {
virtual chan<structZ> done() = 0;
virtual error err() = 0;
};
// context::
namespace context {
struct _Background : IContext {
chan<structZ> done() {
return nil;
}
error err() {
return nil;
}
};
static _Background _bg;
// XXX doc
IContext* background() {
return &_bg; // NOTE nil is not valid in C++ (IContext* also carries vtab ptr)
}
// XXX doc
const error canceled = fmt::errorf("context canceled");
// XXX deadline exceeded?
} // context::
#if 0
interface(Context) {
ifunc(chan<structZ> done());
ifunc(error err());
};
I<io::Reader>(f)
// XXX wrap T* as IContext
template<typename T>
class Context : public IContext {
T *obj;
public:
Context(T *obj) : obj(obj) {}
chan<structZ> done() { return obj->done(); }
error err() { return obj->err(); }
};
#endif
// ---- misc ---- // ---- misc ----
#include <unordered_map> #include <unordered_map>
...@@ -205,6 +259,15 @@ tuple<uint64_t, error> parseHex64(const string& s); ...@@ -205,6 +259,15 @@ tuple<uint64_t, error> parseHex64(const string& s);
tuple<int64_t, error> parseInt(const string& s); tuple<int64_t, error> parseInt(const string& s);
tuple<uint64_t, error> parseUint(const string& s); tuple<uint64_t, error> parseUint(const string& s);
} // xstrconv } // xstrconv::
// zodb::
namespace zodb {
typedef uint64_t Tid;
typedef uint64_t Oid;
} // zodb::
#endif #endif
...@@ -42,78 +42,23 @@ using namespace golang; ...@@ -42,78 +42,23 @@ using namespace golang;
#include <sys/mman.h> #include <sys/mman.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include <stdint.h>
#include "wcfs_watchlink.h"
#include "wcfs_misc.h" #include "wcfs_misc.h"
using std::min; using std::min;
using std::vector; using std::vector;
typedef uint64_t Tid;
typedef uint64_t Oid;
// TidHead is invalid Tid which is largest Tid value and means @head. // TidHead is invalid Tid which is largest Tid value and means @head.
const Tid TidHead = -1ULL; const zodb::Tid TidHead = -1ULL;
static string h(uint64_t v); // v -> 016x hex representation static string h(uint64_t v); // v -> 016x hex representation
#define h_(v) (h(v).c_str()) #define h_(v) (h(v).c_str())
static error mmap_zero_into_ro(void *addr, size_t size); static error mmap_zero_into_ro(void *addr, size_t size);
static error mmap_into_ro(void *addr, size_t size, const os::File &f, off_t offset); static error mmap_into_ro(void *addr, size_t size, const os::File &f, off_t offset);
// XXX ok?
struct IContext {
virtual chan<structZ> done() = 0;
virtual error err() = 0;
};
// context::
namespace context {
struct _Background : IContext {
chan<structZ> done() {
return nil;
}
error err() {
return nil;
}
};
static _Background _bg;
// XXX doc
IContext* background() {
return &_bg; // NOTE nil is not valid in C++ (IContext* also carries vtab ptr)
}
// XXX doc
const error canceled = fmt::errorf("context canceled");
// XXX deadline exceeded?
} // context::
#if 0
interface(Context) {
ifunc(chan<structZ> done());
ifunc(error err());
};
I<io::Reader>(f)
// XXX wrap T* as IContext
template<typename T>
class Context : public IContext {
T *obj;
public:
Context(T *obj) : obj(obj) {}
chan<structZ> done() { return obj->done(); }
error err() { return obj->err(); }
};
#endif
struct Conn; struct Conn;
struct _File; struct _File;
struct _Mapping; struct _Mapping;
...@@ -125,7 +70,7 @@ struct PinReq; ...@@ -125,7 +70,7 @@ struct PinReq;
struct WCFS { struct WCFS {
string mountpoint; string mountpoint;
tuple<Conn*, error> connect(Tid at); tuple<Conn*, error> connect(zodb::Tid at);
string _path(const string &obj); string _path(const string &obj);
tuple<os::File, error> _open(const string &path, int flags=O_RDONLY); tuple<os::File, error> _open(const string &path, int flags=O_RDONLY);
tuple<WatchLink*, error> _openwatch(); tuple<WatchLink*, error> _openwatch();
...@@ -137,15 +82,15 @@ struct WCFS { ...@@ -137,15 +82,15 @@ struct WCFS {
// XXX doc // XXX doc
struct Conn { struct Conn {
WCFS *_wc; WCFS *_wc;
Tid at; zodb::Tid at;
WatchLink *_wlink; WatchLink *_wlink;
sync::Mutex _filemu; sync::Mutex _filemu;
dict<Oid, _File*> _filetab; // {} foid -> _file dict<zodb::Oid, _File*> _filetab; // {} foid -> _file
public: public:
error close(); error close();
error resync(Tid at); error resync(zodb::Tid at);
private: private:
void _pinner(IContext *ctx); void _pinner(IContext *ctx);
...@@ -157,13 +102,13 @@ private: ...@@ -157,13 +102,13 @@ private:
// XXX doc XXX naming -> _FileView ? // XXX doc XXX naming -> _FileView ?
struct _File { struct _File {
Conn *wconn; Conn *wconn;
Oid foid; // hex of ZBigFile root object ID zodb::Oid foid; // hex of ZBigFile root object ID
size_t blksize; // block size of this file XXX -> off_t ? size_t blksize; // block size of this file XXX -> off_t ?
os::File headf; // file object of head/file os::File headf; // file object of head/file
off_t headfsize; // head/file size is known to be at least headfsize (size ↑=) off_t headfsize; // head/file size is known to be at least headfsize (size ↑=)
dict<int64_t, Tid> pinned; // {} blk -> rev that wcfs already sent us for this file dict<int64_t, zodb::Tid> pinned; // {} blk -> rev that wcfs already sent us for this file
vector<_Mapping*> mmaps; // []_Mapping ↑blk_start mappings of this file vector<_Mapping*> mmaps; // []_Mapping ↑blk_start mappings of this file
}; };
// _Mapping represents one mapping of _File. // _Mapping represents one mapping of _File.
...@@ -181,88 +126,12 @@ struct _Mapping { ...@@ -181,88 +126,12 @@ struct _Mapping {
return blk_start + (mem_stop - mem_start) / file->blksize; return blk_start + (mem_stop - mem_start) / file->blksize;
} }
error _remmapblk(int64_t blk, Tid at); error _remmapblk(int64_t blk, zodb::Tid at);
};
// StreamID stands for ID of a stream multiplexed over WatchLink.
typedef uint64_t StreamID;
// rxPkt internally represents data of one message received over WatchLink.
struct rxPkt {
// stream over which the data was received
StreamID stream;
// raw data received/to-be-sent.
// XXX not e.g. string as chan<T> currently does not support types with
// non-trivial copy. Note: we anyway need to limit rx line length to
// avoid DoS, but just for DoS the limit would be higher.
uint16_t datalen;
char data[128 - sizeof(StreamID) - sizeof(uint16_t)];
error from_string(const string& rx);
string to_string() const;
};
static_assert(sizeof(rxPkt) == 128);
// WatchLink represents /head/watch link opened on wcfs.
//
// It is created by WCFS::_openwatch().
//
// .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages.
// .close() closes the link.
//
// It is safe to use WatchLink from multiple threads simultaneously.
class WatchLink {
WCFS *_wc;
os::File _f; // head/watch file handle
string _rxbuf; // buffer for read data from _f
chan<structZ> _rx_eof; // becomes ready when wcfs closes its tx side
// inv.protocol message IO
chan<rxPkt> _acceptq; // server originated messages go here
sync::Mutex _rxmu;
bool _rxdown;
dict<StreamID, chan<rxPkt>>
_rxtab; // {} stream -> rxq server replies go via here
set<StreamID> _accepted; // streams we accepted but did not replied yet
StreamID _req_next; // stream ID for next client-originated request XXX -> atomic
sync::Mutex _txmu; // serializes writes
sync::Once _txclose1;
#if 0
func() _serveCancel
sync.WorkGroup *_serveWG
#endif
public:
friend tuple<WatchLink*, error> WCFS::_openwatch();
error close();
error recvReq(IContext *ctx, PinReq *rx_into);
tuple<string, error> sendReq(IContext *ctx, const string &req);
private:
void _closeTX();
error _serveRX(IContext *ctx);
tuple<string, error> _readline();
error _send(StreamID stream, const string &msg);
error _write(const string &pkt);
tuple<chan<rxPkt>, error> _sendReq(IContext *ctx, const string &req);
};
// PinReq represents 1 server-initiated wcfs pin request received over /head/watch link.
struct PinReq {
StreamID stream; // request was received with this stream ID
Oid foid; // request is about this file
int64_t blk; // ----//---- about this block
Tid at; // pin to this at; TidHead means unpin to head
}; };
// connect creates new Conn viewing WCFS state as of @at. // connect creates new Conn viewing WCFS state as of @at.
tuple<Conn*, error> WCFS::connect(Tid at) { tuple<Conn*, error> WCFS::connect(zodb::Tid at) {
WCFS *wc = this; WCFS *wc = this;
// XXX err ctx // XXX err ctx
...@@ -398,15 +267,15 @@ void Conn::_pin1(PinReq *req) { ...@@ -398,15 +267,15 @@ void Conn::_pin1(PinReq *req) {
// XXX Conn::mmap // XXX Conn::mmap
// resync resyncs connection and its mappings onto different database view. // resync resyncs connection and its mappings onto different database view.
error Conn::resync(Tid at) { error Conn::resync(zodb::Tid at) {
Conn &wconn = *this; Conn &wconn = *this;
// XXX err ctx // XXX err ctx
// XXX locking // XXX locking
for (auto fit : wconn._filetab) { for (auto fit : wconn._filetab) {
Oid foid = fit.first; zodb::Oid foid = fit.first;
_File &f = *fit.second; _File &f = *fit.second;
// XXX if file has no mappings and was not used during whole prev // XXX if file has no mappings and was not used during whole prev
// cycle - forget and stop watching it // cycle - forget and stop watching it
...@@ -461,7 +330,7 @@ error Conn::resync(Tid at) { ...@@ -461,7 +330,7 @@ error Conn::resync(Tid at) {
// //
// at=TidHead means unpin to head/ . // at=TidHead means unpin to head/ .
// NOTE this does not check whether virtmem already mapped blk as RW. // NOTE this does not check whether virtmem already mapped blk as RW.
error _Mapping::_remmapblk(int64_t blk, Tid at) { error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) {
// XXX err context? blk #<blk> @<at> // XXX err context? blk #<blk> @<at>
_Mapping *mmap = this; _Mapping *mmap = this;
...@@ -518,374 +387,6 @@ error _Mapping::_remmapblk(int64_t blk, Tid at) { ...@@ -518,374 +387,6 @@ error _Mapping::_remmapblk(int64_t blk, Tid at) {
// XXX _Mapping::unmap // XXX _Mapping::unmap
// ---- WatchLink ----
// _openwatch opens new watch link on wcfs.
tuple<WatchLink*, error> WCFS::_openwatch() {
WCFS *wc = this;
// head/watch handle.
os::File f;
error err;
tie(f, err) = wc->_open("head/watch", O_RDWR);
if (err != nil)
return make_tuple((WatchLink *)NULL, err);
WatchLink *wlink = new(WatchLink);
wlink->_wc = wc;
wlink->_f = f;
wlink->_rx_eof = makechan<structZ>();
wlink->_acceptq = makechan<rxPkt>();
wlink->_rxdown = false;
wlink->_req_next = 1;
#if 0
serveCtx, wlink._serveCancel = context.with_cancel(context.background())
wlink->_serveWG = sync.WorkGroup(serveCtx)
wlink->_serveWG.go(wlink._serveRX)
#endif
return make_tuple(wlink, nil);
}
void WatchLink::_closeTX() {
WatchLink &wlink = *this;
wlink._txclose1.do_([&]() {
// ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
// _serveRX on client (= on us). The connection can be already closed
// by wcfs - so ignore errors when sending bye.
(void)wlink._send(1, "bye"); // XXX stream ok?
// XXX vvv should be ~ shutdown(TX, wlink._f), however shutdown does
// not work for non-socket file descriptors. And even if we dup link
// fd, and close only one used for TX, peer's RX will still be blocked
// ad fds are referring to one file object which stays in opened
// state. So just use ^^^ "bye" as "TX closed" message.
// wlink._wtx.close();
});
}
// close closes the link.
error WatchLink::close() {
WatchLink& wlink = *this;
wlink._closeTX();
#if 0
wlink._serveCancel();
// XXX we can get stuck here if wcfs does not behave as we want.
// XXX in particular if there is a silly - e.g. syntax or type error in
// test code - we currently get stuck here.
//
// XXX -> better pthread_kill(SIGINT) instead of relying on wcfs proper behaviour?
// XXX -> we now have `kill -QUIT` to wcfs.go on test timeout - remove ^^^ comments?
error err = wlink._serveWG.wait();
// canceled is expected and ok
if (err == context.canceled)
err = nil;
#else
error err = nil;
#endif
error err2 = wlink._f.close();
if (err == nil)
err = err2;
return err;
}
// _serveRX receives messages from ._f and dispatches them according to streamID.
error WatchLink::_serveRX(IContext *ctx) { // XXX error -> where ?
WatchLink& wlink = *this;
// when finishing - wakeup everyone waiting for rx
defer([&]() {
wlink._acceptq.close();
wlink._rxmu.lock();
wlink._rxdown = true; // don't allow new rxtab registers
wlink._rxmu.unlock();
for (auto _ : wlink._rxtab) {
auto rxq = _.second;
rxq.close();
}
});
string l;
error err;
rxPkt pkt;
while (1) {
// NOTE: .close() makes sure .f.read*() will wake up
tie(l, err) = wlink._readline(); // XXX +maxlen
if (err == io::EOF_) { // peer closed its tx
// XXX what happens on other errors?
wlink._rx_eof.close();
}
if (err != nil)
return err;
printf("C: watch : rx: %s", l.c_str());
err = pkt.from_string(l);
if (err != nil)
return err;
if (pkt.stream == 0) { // control/fatal message from wcfs
// XXX print -> receive somewhere? XXX -> recvCtl ?
printf("C: watch : rx fatal: %s\n", l.c_str());
//wlink.fatalv.append(msg);
continue;
}
bool reply = (pkt.stream % 2 != 0);
if (reply) {
chan<rxPkt> rxq;
bool ok;
wlink._rxmu.lock();
tie(rxq, ok) = wlink._rxtab.pop(pkt.stream);
wlink._rxmu.unlock();
if (!ok) {
// wcfs sent reply on unexpected stream
// XXX log + dowmn.
printf("wcfs sent reply on unexpected stream\n");
continue;
}
int _ = select({
ctx->done().recvs(), // 0
rxq.sends(&pkt), // 1
});
if (_ == 0)
return ctx->err();
}
else {
wlink._rxmu.lock();
if (wlink._accepted.has(pkt.stream)) {
wlink._rxmu.unlock();
// XXX log + down
printf("wcfs sent request on already used stream\n");
continue;
}
// XXX clear _accepted not to leak memory after reply is sent?
wlink._accepted.insert(pkt.stream);
wlink._rxmu.unlock();
int _ = select({
ctx->done().recvs(), // 0
wlink._acceptq.sends(&pkt), // 1
});
if (_ == 0)
return ctx->err();
}
}
}
// _send sends raw message via specified stream.
//
// multiple _send can be called in parallel - _send serializes writes.
// XXX +ctx?
error WatchLink::_send(StreamID stream, const string &msg) {
WatchLink *wlink = this;
if (msg.find('\n') != string::npos)
panic("msg has \\n");
string pkt = fmt::sprintf("%lu %s\n", stream, msg.c_str());
return wlink->_write(pkt);
}
error WatchLink::_write(const string &pkt) {
WatchLink *wlink = this;
wlink->_txmu.lock();
defer([&]() {
wlink->_txmu.unlock();
});
//printf('C: watch : tx: %r' % pkt)
int n;
error err;
tie(n, err) = wlink->_f.write(pkt.c_str(), pkt.size());
return err;
}
// sendReq sends client -> server request and returns server reply.
// XXX -> reply | None when EOF
tuple<string, error> WatchLink::sendReq(IContext *ctx, const string &req) {
WatchLink *wlink = this;
// XXX err ctx
rxPkt rx;
chan<rxPkt> rxq;
error err;
tie(rxq, err) = wlink->_sendReq(ctx, req);
// XXX err
int _ = select({
ctx->done().recvs(), // 0
rxq.recvs(&rx), // 1
});
if (_ == 0)
return make_tuple("", ctx->err());
// XXX check for EOF
string reply = rx.to_string();
return make_tuple(reply, nil);
}
tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string &req) {
WatchLink *wlink = this;
// XXX err ctx?
wlink->_txmu.lock(); // XXX -> atomic (currently uses arbitrary lock)
StreamID stream = wlink->_req_next;
wlink->_req_next = (wlink->_req_next + 2); // wraparound at uint64 max
wlink->_txmu.unlock();
auto rxq = makechan<rxPkt>(1);
wlink->_rxmu.lock();
if (wlink->_rxdown) {
wlink->_rxmu.unlock();
return make_tuple(nil, fmt::errorf("link is down"));
}
if (wlink->_rxtab.has(stream)) {
wlink->_rxmu.unlock();
panic("BUG: to-be-sent stream is present in rxtab");
}
wlink->_rxtab[stream] = rxq;
wlink->_rxmu.unlock();
error err = wlink->_send(stream, req);
if (err != nil) {
// remove rxq from rxtab
wlink->_rxmu.lock();
wlink->_rxtab.erase(stream);
wlink->_rxmu.unlock();
// no need to drain rxq - it was created with cap=1
rxq = nil;
}
return make_tuple(rxq, err);
}
// recvReq receives client <- server request.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt);
error WatchLink::recvReq(IContext *ctx, PinReq *prx) {
WatchLink& wlink = *this;
// XXX err ctx?
rxPkt pkt;
bool ok;
int _ = select({
ctx->done().recvs(), // 0
wlink._acceptq.recvs(&pkt, &ok), // 1
});
if (_ == 0)
return ctx->err();
if (!ok)
return io::EOF_;
pkt.to_string();
return _parsePinReq(prx, &pkt);
}
// _parsePinReq parses message into PinReq according to wcfs invalidation protocol.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt) {
// XXX err ctx "bad pin"
pin->stream = pkt->stream;
auto msg = pkt->to_string();
// pin <foid>) #<blk> @<at>
if (!strings::has_prefix(msg, "pin "))
return fmt::errorf("not a pin request"); // XXX +msg?
auto argv = strings::split(msg.substr(4), ' ');
if (argv.size() != 3)
return fmt::errorf("expected 3 arguments, got %zd", argv.size());
error err;
tie(pin->foid, err) = xstrconv::parseHex64(argv[0]);
if (err != nil)
return fmt::errorf("invalid foid");
if (!strings::has_prefix(argv[1], '#'))
return fmt::errorf("invalid blk");
tie(pin->blk, err) = xstrconv::parseInt(argv[1].substr(1));
if (err != nil)
return fmt::errorf("invalid blk");
if (!strings::has_prefix(argv[2], '@'))
return fmt::errorf("invalid at");
auto at = argv[2].substr(1);
if (at == "head") {
pin->at = TidHead;
} else {
tie(pin->at, err) = xstrconv::parseHex64(at);
if (err != nil)
return fmt::errorf("invalid at");
}
return nil;
}
// _readline reads next raw line sent from wcfs.
tuple<string, error> WatchLink::_readline() {
WatchLink& wlink = *this;
char buf[128];
size_t nl_searchfrom = 0;
while (1) {
auto nl = wlink._rxbuf.find('\n', nl_searchfrom);
if (nl != string::npos) {
auto line = wlink._rxbuf.substr(0, nl+1);
wlink._rxbuf = wlink._rxbuf.substr(nl+1);
return make_tuple(line, nil);
}
nl_searchfrom = wlink._rxbuf.length();
int n;
error err;
tie(n, err) = wlink._f.read(buf, sizeof(buf));
if (n > 0) {
// XXX limit line length to avoid DoS
wlink._rxbuf += string(buf, n);
continue;
}
if (err == nil)
panic("read returned (0, nil)");
if (err == io::EOF_ && wlink._rxbuf.length() != 0)
err = io::ErrUnexpectedEOF;
return make_tuple("", err);
}
}
// from_string parses string into rxPkt.
error rxPkt::from_string(const string &rx) {
rxPkt& pkt = *this;
// <stream> ... \n
auto sp = rx.find(' ');
if (sp == string::npos)
return fmt::errorf("invalid pkt: no SP");
string sid = rx.substr(0, sp);
string smsg = rx.substr(sp+1);
error err;
tie(pkt.stream, err) = xstrconv::parseUint(sid);
if (err != nil)
return fmt::errorf("invalid pkt: invalid stream ID");
auto msglen = smsg.length();
if (msglen > ARRAY_SIZE(pkt.data))
return fmt::errorf("invalid pkt: len(msg) > %zu", ARRAY_SIZE(pkt.data));
memcpy(pkt.data, smsg.c_str(), msglen);
pkt.datalen = msglen;
return nil;
}
// to_string converts rxPkt data into string.
string rxPkt::to_string() const {
const rxPkt& pkt = *this;
return string(pkt.data, pkt.datalen);
}
// ---- WCFS raw file access ---- // ---- WCFS raw file access ----
// _path returns path for object on wcfs. // _path returns path for object on wcfs.
......
// Copyright (C) 2018-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
#include "wcfs_watchlink.h"
// _openwatch opens new watch link on wcfs.
tuple<WatchLink*, error> WCFS::_openwatch() {
WCFS *wc = this;
// head/watch handle.
os::File f;
error err;
tie(f, err) = wc->_open("head/watch", O_RDWR);
if (err != nil)
return make_tuple((WatchLink *)NULL, err);
WatchLink *wlink = new(WatchLink);
wlink->_wc = wc;
wlink->_f = f;
wlink->_rx_eof = makechan<structZ>();
wlink->_acceptq = makechan<rxPkt>();
wlink->_rxdown = false;
wlink->_req_next = 1;
#if 0
serveCtx, wlink._serveCancel = context.with_cancel(context.background())
wlink->_serveWG = sync.WorkGroup(serveCtx)
wlink->_serveWG.go(wlink._serveRX)
#endif
return make_tuple(wlink, nil);
}
void WatchLink::_closeTX() {
WatchLink &wlink = *this;
wlink._txclose1.do_([&]() {
// ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
// _serveRX on client (= on us). The connection can be already closed
// by wcfs - so ignore errors when sending bye.
(void)wlink._send(1, "bye"); // XXX stream ok?
// XXX vvv should be ~ shutdown(TX, wlink._f), however shutdown does
// not work for non-socket file descriptors. And even if we dup link
// fd, and close only one used for TX, peer's RX will still be blocked
// ad fds are referring to one file object which stays in opened
// state. So just use ^^^ "bye" as "TX closed" message.
// wlink._wtx.close();
});
}
// close closes the link.
error WatchLink::close() {
WatchLink& wlink = *this;
wlink._closeTX();
#if 0
wlink._serveCancel();
// XXX we can get stuck here if wcfs does not behave as we want.
// XXX in particular if there is a silly - e.g. syntax or type error in
// test code - we currently get stuck here.
//
// XXX -> better pthread_kill(SIGINT) instead of relying on wcfs proper behaviour?
// XXX -> we now have `kill -QUIT` to wcfs.go on test timeout - remove ^^^ comments?
error err = wlink._serveWG.wait();
// canceled is expected and ok
if (err == context.canceled)
err = nil;
#else
error err = nil;
#endif
error err2 = wlink._f.close();
if (err == nil)
err = err2;
return err;
}
// _serveRX receives messages from ._f and dispatches them according to streamID.
error WatchLink::_serveRX(IContext *ctx) { // XXX error -> where ?
WatchLink& wlink = *this;
// when finishing - wakeup everyone waiting for rx
defer([&]() {
wlink._acceptq.close();
wlink._rxmu.lock();
wlink._rxdown = true; // don't allow new rxtab registers
wlink._rxmu.unlock();
for (auto _ : wlink._rxtab) {
auto rxq = _.second;
rxq.close();
}
});
string l;
error err;
rxPkt pkt;
while (1) {
// NOTE: .close() makes sure .f.read*() will wake up
tie(l, err) = wlink._readline(); // XXX +maxlen
if (err == io::EOF_) { // peer closed its tx
// XXX what happens on other errors?
wlink._rx_eof.close();
}
if (err != nil)
return err;
printf("C: watch : rx: %s", l.c_str());
err = pkt.from_string(l);
if (err != nil)
return err;
if (pkt.stream == 0) { // control/fatal message from wcfs
// XXX print -> receive somewhere? XXX -> recvCtl ?
printf("C: watch : rx fatal: %s\n", l.c_str());
//wlink.fatalv.append(msg);
continue;
}
bool reply = (pkt.stream % 2 != 0);
if (reply) {
chan<rxPkt> rxq;
bool ok;
wlink._rxmu.lock();
tie(rxq, ok) = wlink._rxtab.pop(pkt.stream);
wlink._rxmu.unlock();
if (!ok) {
// wcfs sent reply on unexpected stream
// XXX log + dowmn.
printf("wcfs sent reply on unexpected stream\n");
continue;
}
int _ = select({
ctx->done().recvs(), // 0
rxq.sends(&pkt), // 1
});
if (_ == 0)
return ctx->err();
}
else {
wlink._rxmu.lock();
if (wlink._accepted.has(pkt.stream)) {
wlink._rxmu.unlock();
// XXX log + down
printf("wcfs sent request on already used stream\n");
continue;
}
// XXX clear _accepted not to leak memory after reply is sent?
wlink._accepted.insert(pkt.stream);
wlink._rxmu.unlock();
int _ = select({
ctx->done().recvs(), // 0
wlink._acceptq.sends(&pkt), // 1
});
if (_ == 0)
return ctx->err();
}
}
}
// _send sends raw message via specified stream.
//
// multiple _send can be called in parallel - _send serializes writes.
// XXX +ctx?
error WatchLink::_send(StreamID stream, const string &msg) {
WatchLink *wlink = this;
if (msg.find('\n') != string::npos)
panic("msg has \\n");
string pkt = fmt::sprintf("%lu %s\n", stream, msg.c_str());
return wlink->_write(pkt);
}
error WatchLink::_write(const string &pkt) {
WatchLink *wlink = this;
wlink->_txmu.lock();
defer([&]() {
wlink->_txmu.unlock();
});
//printf('C: watch : tx: %r' % pkt)
int n;
error err;
tie(n, err) = wlink->_f.write(pkt.c_str(), pkt.size());
return err;
}
// sendReq sends client -> server request and returns server reply.
// XXX -> reply | None when EOF
tuple<string, error> WatchLink::sendReq(IContext *ctx, const string &req) {
WatchLink *wlink = this;
// XXX err ctx
rxPkt rx;
chan<rxPkt> rxq;
error err;
tie(rxq, err) = wlink->_sendReq(ctx, req);
// XXX err
int _ = select({
ctx->done().recvs(), // 0
rxq.recvs(&rx), // 1
});
if (_ == 0)
return make_tuple("", ctx->err());
// XXX check for EOF
string reply = rx.to_string();
return make_tuple(reply, nil);
}
tuple</*rxq*/chan<rxPkt>, error> WatchLink::_sendReq(IContext *ctx, const string &req) {
WatchLink *wlink = this;
// XXX err ctx?
wlink->_txmu.lock(); // XXX -> atomic (currently uses arbitrary lock)
StreamID stream = wlink->_req_next;
wlink->_req_next = (wlink->_req_next + 2); // wraparound at uint64 max
wlink->_txmu.unlock();
auto rxq = makechan<rxPkt>(1);
wlink->_rxmu.lock();
if (wlink->_rxdown) {
wlink->_rxmu.unlock();
return make_tuple(nil, fmt::errorf("link is down"));
}
if (wlink->_rxtab.has(stream)) {
wlink->_rxmu.unlock();
panic("BUG: to-be-sent stream is present in rxtab");
}
wlink->_rxtab[stream] = rxq;
wlink->_rxmu.unlock();
error err = wlink->_send(stream, req);
if (err != nil) {
// remove rxq from rxtab
wlink->_rxmu.lock();
wlink->_rxtab.erase(stream);
wlink->_rxmu.unlock();
// no need to drain rxq - it was created with cap=1
rxq = nil;
}
return make_tuple(rxq, err);
}
// recvReq receives client <- server request.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt);
error WatchLink::recvReq(IContext *ctx, PinReq *prx) {
WatchLink& wlink = *this;
// XXX err ctx?
rxPkt pkt;
bool ok;
int _ = select({
ctx->done().recvs(), // 0
wlink._acceptq.recvs(&pkt, &ok), // 1
});
if (_ == 0)
return ctx->err();
if (!ok)
return io::EOF_;
pkt.to_string();
return _parsePinReq(prx, &pkt);
}
// _parsePinReq parses message into PinReq according to wcfs invalidation protocol.
static error _parsePinReq(PinReq *pin, const rxPkt *pkt) {
// XXX err ctx "bad pin"
pin->stream = pkt->stream;
auto msg = pkt->to_string();
// pin <foid>) #<blk> @<at>
if (!strings::has_prefix(msg, "pin "))
return fmt::errorf("not a pin request"); // XXX +msg?
auto argv = strings::split(msg.substr(4), ' ');
if (argv.size() != 3)
return fmt::errorf("expected 3 arguments, got %zd", argv.size());
error err;
tie(pin->foid, err) = xstrconv::parseHex64(argv[0]);
if (err != nil)
return fmt::errorf("invalid foid");
if (!strings::has_prefix(argv[1], '#'))
return fmt::errorf("invalid blk");
tie(pin->blk, err) = xstrconv::parseInt(argv[1].substr(1));
if (err != nil)
return fmt::errorf("invalid blk");
if (!strings::has_prefix(argv[2], '@'))
return fmt::errorf("invalid at");
auto at = argv[2].substr(1);
if (at == "head") {
pin->at = TidHead;
} else {
tie(pin->at, err) = xstrconv::parseHex64(at);
if (err != nil)
return fmt::errorf("invalid at");
}
return nil;
}
// _readline reads next raw line sent from wcfs.
tuple<string, error> WatchLink::_readline() {
WatchLink& wlink = *this;
char buf[128];
size_t nl_searchfrom = 0;
while (1) {
auto nl = wlink._rxbuf.find('\n', nl_searchfrom);
if (nl != string::npos) {
auto line = wlink._rxbuf.substr(0, nl+1);
wlink._rxbuf = wlink._rxbuf.substr(nl+1);
return make_tuple(line, nil);
}
nl_searchfrom = wlink._rxbuf.length();
int n;
error err;
tie(n, err) = wlink._f.read(buf, sizeof(buf));
if (n > 0) {
// XXX limit line length to avoid DoS
wlink._rxbuf += string(buf, n);
continue;
}
if (err == nil)
panic("read returned (0, nil)");
if (err == io::EOF_ && wlink._rxbuf.length() != 0)
err = io::ErrUnexpectedEOF;
return make_tuple("", err);
}
}
// from_string parses string into rxPkt.
error rxPkt::from_string(const string &rx) {
rxPkt& pkt = *this;
// <stream> ... \n
auto sp = rx.find(' ');
if (sp == string::npos)
return fmt::errorf("invalid pkt: no SP");
string sid = rx.substr(0, sp);
string smsg = rx.substr(sp+1);
error err;
tie(pkt.stream, err) = xstrconv::parseUint(sid);
if (err != nil)
return fmt::errorf("invalid pkt: invalid stream ID");
auto msglen = smsg.length();
if (msglen > ARRAY_SIZE(pkt.data))
return fmt::errorf("invalid pkt: len(msg) > %zu", ARRAY_SIZE(pkt.data));
memcpy(pkt.data, smsg.c_str(), msglen);
pkt.datalen = msglen;
return nil;
}
// to_string converts rxPkt data into string.
string rxPkt::to_string() const {
const rxPkt& pkt = *this;
return string(pkt.data, pkt.datalen);
}
// Copyright (C) 2018-2019 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// wcfs_watchlink provides WatchLink class that implements message exchange
// over /head/watch.
#ifndef _NXD_WCFS_WATCHLINK_H_
#define _NXD_WCFS_WATCHLINK_H_
#include "wcfs_misc.h"
struct WCFS;
struct PinReq;
// StreamID stands for ID of a stream multiplexed over WatchLink.
typedef uint64_t StreamID;
// rxPkt internally represents data of one message received over WatchLink.
struct rxPkt {
// stream over which the data was received
StreamID stream;
// raw data received/to-be-sent.
// XXX not e.g. string as chan<T> currently does not support types with
// non-trivial copy. Note: we anyway need to limit rx line length to
// avoid DoS, but just for DoS the limit would be higher.
uint16_t datalen;
char data[128 - sizeof(StreamID) - sizeof(uint16_t)];
error from_string(const string& rx);
string to_string() const;
};
static_assert(sizeof(rxPkt) == 128);
// WatchLink represents /head/watch link opened on wcfs.
//
// It is created by WCFS::_openwatch().
//
// .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages.
// .close() closes the link.
//
// It is safe to use WatchLink from multiple threads simultaneously.
class WatchLink {
WCFS *_wc;
os::File _f; // head/watch file handle
string _rxbuf; // buffer for read data from _f
chan<structZ> _rx_eof; // becomes ready when wcfs closes its tx side
// inv.protocol message IO
chan<rxPkt> _acceptq; // server originated messages go here
sync::Mutex _rxmu;
bool _rxdown;
dict<StreamID, chan<rxPkt>>
_rxtab; // {} stream -> rxq server replies go via here
set<StreamID> _accepted; // streams we accepted but did not replied yet
StreamID _req_next; // stream ID for next client-originated request XXX -> atomic
sync::Mutex _txmu; // serializes writes
sync::Once _txclose1;
#if 0
func() _serveCancel
sync.WorkGroup *_serveWG
#endif
public:
//friend tuple<WatchLink*, error> WCFS::_openwatch();
error close();
error recvReq(IContext *ctx, PinReq *rx_into);
tuple<string, error> sendReq(IContext *ctx, const string &req);
private:
void _closeTX();
error _serveRX(IContext *ctx);
tuple<string, error> _readline();
error _send(StreamID stream, const string &msg);
error _write(const string &pkt);
tuple<chan<rxPkt>, error> _sendReq(IContext *ctx, const string &req);
};
// PinReq represents 1 server-initiated wcfs pin request received over /head/watch link.
struct PinReq {
StreamID stream; // request was received with this stream ID
zodb::Oid foid; // request is about this file
int64_t blk; // ----//---- about this block
zodb::Tid at; // pin to this at; TidHead means unpin to head
};
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment