Commit 4e35a491 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3db68ece
......@@ -67,7 +67,7 @@ error File::stat(struct stat *st) {
}
// _errno returns error corresponding to file->op and errno.
// _errno returns error corresponding to op(file) and errno.
error File::_errno(const char *op) {
File *f = this;
return _pathError(op, f->_path, errno);
......@@ -85,19 +85,20 @@ static error _pathError(const char *op, const string &path, int syserr) {
// mm::
namespace mm {
// map_into memory-maps f.fd[offset +size) into [addr +size).
// prot is PROT_* from mmap(2).
// flags is MAP_* from mmap(2); MAP_FIXED is added automatically.
error map_into(void *addr, size_t size, int prot, int flags, const os::File &f, off_t offset) {
void *addr2;
addr2 = mmap(addr, size, prot, MAP_FIXED | flags, f.fd(), offset);
if (addr2 == MAP_FAILED)
return _pathError("mmap", f.name(), errno);
if (addr2 != addr)
panic("mmap(addr, MAP_FIXED): returned !addr");
return nil;
}
// map_into memory-maps f.fd[offset +size) into [addr +size).
// prot is PROT_* from mmap(2).
// flags is MAP_* from mmap(2); MAP_FIXED is added automatically.
error map_into(void *addr, size_t size, int prot, int flags, const os::File &f, off_t offset) {
void *addr2;
addr2 = mmap(addr, size, prot, MAP_FIXED | flags, f.fd(), offset);
if (addr2 == MAP_FAILED)
return _pathError("mmap", f.name(), errno);
if (addr2 != addr)
panic("mmap(addr, MAP_FIXED): returned !addr");
return nil;
}
} // mm::
......
......@@ -25,7 +25,7 @@
using namespace golang;
// XXX hack: C++ does not have __builtin_types_compatible_p, but CCAN configure
// think it does because CCAN is configired via C, not C++
// thinks it does because CCAN is configired via C, not C++.
#include <config.h>
#undef HAVE_BUILTIN_TYPES_COMPATIBLE_P
#define HAVE_BUILTIN_TYPES_COMPATIBLE_P 0
......@@ -56,10 +56,10 @@ using set = std::unordered_set<Key>;
using std::vector;
typedef uint64_t Tid; // XXX ok?
typedef uint64_t Oid; // XXX ok?
typedef uint64_t Tid;
typedef uint64_t Oid;
// TidHead is invalid Tid which is larged Tid value and means @head
// TidHead is invalid Tid which is largest Tid value and means @head.
const Tid TidHead = -1ULL;
static string h(uint64_t v); // v -> 016x hex representation
......@@ -67,7 +67,6 @@ static string h(uint64_t v); // v -> 016x hex representation
static error mmap_zero_into_ro(void *addr, size_t size);
// XXX ok?
struct IContext {
virtual chan<structZ> done() = 0;
......@@ -86,7 +85,7 @@ struct WCFS {
tuple<Conn*, error> connect(Tid at);
string _path(const string &obj);
tuple<os::File, error> _open(const string &path, int mode=O_RDONLY);
tuple<os::File, error> _open(const string &path, int flags=O_RDONLY);
tuple<WatchLink*, error> _openwatch();
};
......@@ -102,7 +101,6 @@ struct Conn {
sync::Mutex _filemu;
dict<Oid, _File*> _filetab; // {} foid -> _file
private:
void _pinner(IContext *ctx);
void _pin1(SrvReq *req);
......@@ -110,15 +108,16 @@ private:
// _File represent isolated file view under Conn.
//
// XXX doc
// XXX doc XXX naming -> _FileView ?
struct _File {
Conn *wconn;
Oid foid; // hex of ZBigFile root object ID
size_t blksize; // block size of this file
os::File headf; // file object of head/file
// .headfsize head/file size is known to be at least headfsize (size ↑=)
dict<int64_t, Tid> pinned; // {} blk -> rev that wcfs already sent us for this file
vector<_Mapping*> mmaps; // []_Mapping ↑blk_start mappings of this file
dict<int64_t, Tid> pinned; // {} blk -> rev that wcfs already sent us for this file
vector<_Mapping*> mmaps; // []_Mapping ↑blk_start mappings of this file
};
// _Mapping represents one mapping of _File.
......@@ -139,7 +138,7 @@ struct _Mapping {
};
// StreamID stands for ID of a stream used over WatchLink.
// StreamID stands for ID of a stream multiplexed over WatchLink.
typedef uint64_t StreamID;
// WatchLink represents /head/watch link opened on wcfs.
......@@ -156,14 +155,14 @@ class WatchLink {
chan<structZ> _rx_eof; // becomes ready when wcfs closes its tx side
// inv.protocol message IO
chan<(StreamID,msg)> _acceptq; // server originated messages go here
sync::Mutex _rxmu;
dict<StreamID, chan<XXX>> _rxtab; // {} stream -> rxq server replies go via here
set<StreamID> _accepted; // streams we accepted but did not replied yet
chan<(StreamID,msg)> _acceptq; // server originated messages go here
sync::Mutex _rxmu;
dict<StreamID, chan<XXX>> _rxtab; // {} stream -> rxq server replies go via here
set<StreamID> _accepted; // streams we accepted but did not replied yet
StreamID _req_next = 1; // stream ID for next client-originated request XXX -> atomic
StreamID _req_next; // stream ID for next client-originated request XXX -> atomic
sync::Mutex _txmu; // serializes writes
bool _txclosed; // XXX = False
bool _txclosed; // XXX -> _txcloseOnce
#if 0
func() _serveCancel
......@@ -222,6 +221,7 @@ void Conn::_pinner(IContext *ctx) {
// XXX panic/exc -> log.CRITICAL
while (1) {
// XXX -> recv inplace into on-stack req ?
SrvReq *req = wconn->_wlink->recvReq(ctx);
if (req == NULL)
return // XXX ok? (EOF - when wcfs closes wlink)
......@@ -238,6 +238,7 @@ void Conn::_pin1(SrvReq *req) {
Conn *wconn = this;
// XXX defer: reply either ack or nak on error
// XXX return error?
wconn->_filemu.lock();
auto _ = wconn->_filetab.find(req->foid);
......@@ -280,8 +281,8 @@ void Conn::_pin1(SrvReq *req) {
// _remmapblk remmaps mapping memory for file[blk] to be viewing database as of @at state.
//
// at=None means unpin to head/ . XXX -> C
// NOTE this does not check wrt virtmem already mapped blk as RW.
// at=TidHead means unpin to head/ .
// NOTE this does not check whether virtmem already mapped blk as RW.
error _Mapping::_remmapblk(int64_t blk, Tid at) {
// XXX err context? blk #<blk> @<at>
_Mapping *mmap = this;
......@@ -313,7 +314,7 @@ error _Mapping::_remmapblk(int64_t blk, Tid at) {
err = fsfile.stat(&st);
if (err != nil)
return err;
ASSERT(st.st_blksize == f->blksize); // FIXME assert
ASSERT(st.st_blksize == f->blksize); // FIXME assert -> return err ?
// block is beyond file size - mmap with zeros - else access to memory
// after file.size will raise SIGBUS. (assumes head/f size ↑=)
......@@ -334,7 +335,6 @@ error _Mapping::_remmapblk(int64_t blk, Tid at) {
// _openwatch opens new watch link on wcfs.
// XXX WatchLink* -> no ptr?
tuple<WatchLink*, error> WCFS::_openwatch() {
WCFS *wc = this;
......@@ -346,12 +346,12 @@ tuple<WatchLink*, error> WCFS::_openwatch() {
return make_tuple((WatchLink *)NULL, err);
WatchLink *wlink = new(WatchLink);
wlink->_wc = wc;
wlink->_f = f;
wlink->_rx_eof = makechan<structZ>();
wlink->_acceptq = makechan<XXX>();
wlink->_req_next = 1;
wlink->_txclosed = false;
wlink->_wc = wc;
wlink->_f = f;
wlink->_rx_eof = makechan<structZ>();
wlink->_acceptq = makechan<XXX>();
wlink->_req_next = 1;
wlink->_txclosed = false;
#if 0
serveCtx, wlink._serveCancel = context.with_cancel(context.background())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment