Commit 2c2a094f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a288e2bc
...@@ -69,7 +69,7 @@ ...@@ -69,7 +69,7 @@
// never shrink. It is indeed currently so, but will have to be revisited // never shrink. It is indeed currently so, but will have to be revisited
// if/when wendelin.core adds bigfile truncation. Wcfs client restats // if/when wendelin.core adds bigfile truncation. Wcfs client restats
// wcfs/head/f at every transaction boundary (Conn.resync) and remembers f.size // wcfs/head/f at every transaction boundary (Conn.resync) and remembers f.size
// in FileH._headfsize for use during one transaction. // in FileH._headfsize for use during one transaction(%).
// //
// //
// Integration with wendelin.core virtmem layer // Integration with wendelin.core virtmem layer
...@@ -111,6 +111,7 @@ ...@@ -111,6 +111,7 @@
// (+) currently, for simplicity, there is one pinner thread for each connection. // (+) currently, for simplicity, there is one pinner thread for each connection.
// In the future, for efficiency, it might be reworked to be one pinner thread // In the future, for efficiency, it might be reworked to be one pinner thread
// that serves all connections simultaneously. // that serves all connections simultaneously.
// (%) see _headWait comments on how this has to be reworked.
// Wcfs client locking organization // Wcfs client locking organization
...@@ -167,6 +168,7 @@ ...@@ -167,6 +168,7 @@
#include <golang/errors.h> #include <golang/errors.h>
#include <golang/fmt.h> #include <golang/fmt.h>
#include <golang/io.h> #include <golang/io.h>
#include <golang/time.h>
#include <algorithm> #include <algorithm>
#include <string> #include <string>
...@@ -181,6 +183,8 @@ using std::min; ...@@ -181,6 +183,8 @@ using std::min;
using std::max; using std::max;
using std::vector; using std::vector;
namespace ioutil = io::ioutil;
#define TRACE 0 #define TRACE 0
#if TRACE #if TRACE
...@@ -204,13 +208,21 @@ static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset); ...@@ -204,13 +208,21 @@ static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset);
// connect creates new Conn viewing WCFS state as of @at. // connect creates new Conn viewing WCFS state as of @at.
pair<Conn, error> WCFS::connect(zodb::Tid at) { pair<Conn, error> WCFS::connect(zodb::Tid at) {
WCFS *wc = this; WCFS *wc = this;
xerr::Contextf E("wcfs %s: connect @%s", v(wc->mountpoint), v(at)); xerr::Contextf E("%s: connect @%s", v(wc), v(at));
etrace(""); etrace("");
error err;
// TODO support !isolated mode // TODO support !isolated mode
// need to wait till `wcfs/head/at ≥ at` because e.g. Conn.open stats
// head/f to get f.headfsize.
err = wc->_headWait(at);
if (err != nil) {
return make_pair(nil, E(err));
}
WatchLink wlink; WatchLink wlink;
error err;
tie(wlink, err) = wc->_openwatch(); tie(wlink, err) = wc->_openwatch();
if (err != nil) if (err != nil)
return make_pair(nil, E(err)); return make_pair(nil, E(err));
...@@ -227,25 +239,52 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) { ...@@ -227,25 +239,52 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) {
return wconn->_pinner(ctx); return wconn->_pinner(ctx);
}); });
// need to wait till `wcfs/head/at ≥ at` because e.g. Conn.open stats
// head/f to get f.headfsize.
// XXX atMu.RLock ?
err = wconn->_headWait(at);
if (err != nil) {
// XXX bring conn down - stop pinner
return make_pair(nil, E(err));
}
return make_pair(wconn, nil); return make_pair(wconn, nil);
} }
// _headWait waits till wcfs/head/at becomes ≥ at. // _headWait waits till wcfs/head/at becomes ≥ at.
// //
// XXX locks condition?
// XXX place? // XXX place?
error _Conn::_headWait(zodb::Tid at) { // XXX +ctx ? //
// FIXME implement // _headWait is currently needed, because client stats wcfs/head/f to get f
// size assuming that f size only ↑. The assumption is not generally valid
// (e.g. f might be truncated = hole puched for block at tail), but holds true
// for now. However to get correct results wcfs/head/f has to be statt'ed
// _after_ wcfs view of the database becomes ≥ wconn.at.
//
// TODO extend isolation protocol to report f size as of @at database state at
// watch init/update(*). This way there won't be need for headWait as correct
// file size @at will be returned by wcfs itself, which will also work if
// wcfs/head/f size is changed arbitrarily.
//
// (*) equivalient might be to send something like "pin #<bsize>.. Z" (pin
// blocks bsize till ∞ to zeros).
error WCFS::_headWait(zodb::Tid at) {
WCFS *wc = this;
xerr::Contextf E("%s: headWait @%s", v(wc), v(at));
etrace("");
zodb::Tid xat;
string xatStr;
error err;
// XXX dumb implementation, because _headWait should go away.
while (1) {
tie(xatStr, err) = ioutil::ReadFile(wc->_path("head/at"));
if (err != nil)
return E(err);
tie(xat, err) = xstrconv::parseHex64(xatStr);
if (err != nil)
return E(fmt::errorf("head/at: %w", err));
if (xat >= at)
break;
time::sleep(1E-6); // XXX -> time::millisecond
}
return nil; return nil;
} }
...@@ -555,22 +594,21 @@ error _Conn::resync(zodb::Tid at) { ...@@ -555,22 +594,21 @@ error _Conn::resync(zodb::Tid at) {
wconn._atMu.RUnlock(); wconn._atMu.RUnlock();
etrace(""); etrace("");
// bring wconn + fileh + mmaps down on error
bool retok = false;
defer([&]() {
if (!retok)
wconn.close(); // ignore error
});
// XXX _downErr -> E // XXX _downErr -> E
// XXX at ^ (increases) -> rejected by wcfs XXX or also precheck here? // XXX at ^ (increases) -> rejected by wcfs XXX or also precheck here?
// first wait for wcfs/head to be >= at. // wait for wcfs/head to be >= at.
// we need this e.g. to be sure that head/f.size is at least as big that it will be @at state. // we need this e.g. to be sure that head/f.size is at least as big that it will be @at state.
err = wconn._headWait(at); err = wconn._wc->_headWait(at);
if (err != nil) if (err != nil)
return err; return E(err);
// bring wconn + fileh + mmaps down on error
bool retok = false;
defer([&]() {
if (!retok)
wconn.close(); // ignore error
});
// lock wconn._atMu.W . This excludes everything else, and in // lock wconn._atMu.W . This excludes everything else, and in
// particular _pinner_, from running and mutating files and mappings. // particular _pinner_, from running and mutating files and mappings.
......
...@@ -152,6 +152,7 @@ struct WCFS { ...@@ -152,6 +152,7 @@ struct WCFS {
pair<WatchLink, error> _openwatch(); pair<WatchLink, error> _openwatch();
string String() const; string String() const;
error _headWait(zodb::Tid at);
}; };
// Conn represents logical connection that provides view of data on wcfs // Conn represents logical connection that provides view of data on wcfs
...@@ -208,7 +209,6 @@ private: ...@@ -208,7 +209,6 @@ private:
error __pinner(context::Context ctx); error __pinner(context::Context ctx);
error _pin1(PinReq *req); error _pin1(PinReq *req);
error __pin1(PinReq *req); error __pin1(PinReq *req);
error _headWait(zodb::Tid at);
}; };
// FileH represent isolated file view under Conn. // FileH represent isolated file view under Conn.
......
...@@ -34,6 +34,9 @@ using namespace golang; ...@@ -34,6 +34,9 @@ using namespace golang;
#include <memory> #include <memory>
// golang::
namespace golang {
// os:: // os::
namespace os { namespace os {
...@@ -178,6 +181,46 @@ error unmap(void *addr, size_t size) { ...@@ -178,6 +181,46 @@ error unmap(void *addr, size_t size) {
} // mm:: } // mm::
// io::ioutil::
namespace io {
namespace ioutil {
tuple<string, error> ReadFile(const string& path) {
// errctx is ok as returned by all calls.
os::File f;
error err;
tie(f, err) = os::open(path);
if (err != nil)
return make_tuple("", err);
// XXX defer close
string data;
vector<char> buf(4096);
while (1) {
int n;
tie(n, err) = f->read(&buf[0], buf.size());
data.append(&buf[0], n);
if (err != nil) {
if (err == io::EOF_)
err = nil;
break;
}
}
error err2 = f->close();
if (err == nil)
err = err2;
if (err != nil)
data = "";
return make_tuple(data, err);
}
}} // io::ioutil::
// xstrconv:: (strconv-like) // xstrconv:: (strconv-like)
namespace xstrconv { namespace xstrconv {
...@@ -215,6 +258,9 @@ tuple<uint64_t, error> parseUint(const string& s) { ...@@ -215,6 +258,9 @@ tuple<uint64_t, error> parseUint(const string& s) {
} // xstrconv:: } // xstrconv::
} // golang::
// xerr:: // xerr::
namespace xerr { namespace xerr {
......
...@@ -55,6 +55,9 @@ using std::vector; ...@@ -55,6 +55,9 @@ using std::vector;
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>
// golang::
namespace golang {
// os:: // os::
namespace os { namespace os {
...@@ -116,6 +119,13 @@ namespace mm { ...@@ -116,6 +119,13 @@ namespace mm {
} // mm:: } // mm::
// io::ioutil::
namespace io {
namespace ioutil {
tuple<string, error> ReadFile(const string& path);
}} // io::ioutil::
// ---- misc ---- // ---- misc ----
...@@ -129,6 +139,8 @@ tuple<uint64_t, error> parseUint(const string& s); ...@@ -129,6 +139,8 @@ tuple<uint64_t, error> parseUint(const string& s);
} // xstrconv:: } // xstrconv::
} // golang::
// zodb:: // zodb::
namespace zodb { namespace zodb {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment