Commit 2fdfe403 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b9654e0b
...@@ -49,6 +49,32 @@ ...@@ -49,6 +49,32 @@
// //
// Conn.atMu > Conn.mu > FileH.mu // Conn.atMu > Conn.mu > FileH.mu
// //
// Several locks are RWMutex instead of just Mutex not only to allow more
// concurrency, but, in the first place for correctness: pinner being core
// element in handling WCFS isolation protocol, is effectively invoked
// synchronously from other threads via messages coming through wcfs server.
// For example Conn.resync sends watch request to wcfs and waits for the
// answer. Wcfs server, in turn, sends corresponding pin messages to the pinner
// and _waits_ for the answere before answering to resync:
//
// - - - - - -
// | |
// pinner <------.
// | | wcfs
// resync -------^
// | |
// - - - - - -
// client process
//
//
// This creates the neccessity to use RWMutex for locks that pinner and other
// parts of the code could be using at the same time in sychronous mode similar
// to the above. This locks are:
//
// - Conn.atMu
// - Conn.mu
//
//
// XXX link to bigfile/file_zodb.cpp to show how wcfs/client is used for // XXX link to bigfile/file_zodb.cpp to show how wcfs/client is used for
// ZBigFile on client side. // ZBigFile on client side.
...@@ -140,10 +166,10 @@ error _Conn::close() { ...@@ -140,10 +166,10 @@ error _Conn::close() {
}; };
bool alreadyClosed = false; bool alreadyClosed = false;
wconn._mu.lock(); wconn._mu.Lock();
alreadyClosed = (wconn._downErr == errConnClosed); alreadyClosed = (wconn._downErr == errConnClosed);
wconn._downErr = errConnClosed; wconn._downErr = errConnClosed;
wconn._mu.unlock(); wconn._mu.Unlock();
if (alreadyClosed) if (alreadyClosed)
return nil; return nil;
...@@ -162,9 +188,9 @@ error _Conn::close() { ...@@ -162,9 +188,9 @@ error _Conn::close() {
// NOTE after file is closed mappings could continue to survive, but we can no // NOTE after file is closed mappings could continue to survive, but we can no
// longer maintain consistent view. For this reason we change mappings to // longer maintain consistent view. For this reason we change mappings to
// something that gives EFAULT on access. XXX implement // something that gives EFAULT on access. XXX implement
wconn._mu.lock(); wconn._mu.Lock();
defer([&]() { defer([&]() {
wconn._mu.unlock(); wconn._mu.Unlock();
}); });
// XXX f locking // XXX f locking
...@@ -200,12 +226,13 @@ error _Conn::_pinner(context::Context ctx) { ...@@ -200,12 +226,13 @@ error _Conn::_pinner(context::Context ctx) {
} }
// mark the connection non-operational if the pinner fails // mark the connection non-operational if the pinner fails
wconn._mu.lock(); // XXX locking ok? -> merge into below where lock is held? // XXX deadlock wrt resync? (who read-locks wconn.mu)
wconn._mu.Lock(); // XXX locking ok? -> merge into below where lock is held?
if (wconn._downErr == nil) { if (wconn._downErr == nil) {
wconn._downErr = fmt::errorf("no longer operational due to: %w", err); // XXX err=nil ? wconn._downErr = fmt::errorf("no longer operational due to: %w", err); // XXX err=nil ?
// XXX make all fileh and mapping invalid. // XXX make all fileh and mapping invalid.
} }
wconn._mu.unlock(); wconn._mu.Unlock();
return err; return err;
} }
...@@ -222,9 +249,9 @@ error _Conn::__pinner(context::Context ctx) { ...@@ -222,9 +249,9 @@ error _Conn::__pinner(context::Context ctx) {
if (err != nil) { if (err != nil) {
// it is ok if we receive EOF due to us (client) closing the connection // it is ok if we receive EOF due to us (client) closing the connection
if (err == io::EOF_) { if (err == io::EOF_) {
wconn._mu.lock(); wconn._mu.RLock();
err = (wconn._downErr == errConnClosed) ? nil : io::ErrUnexpectedEOF; err = (wconn._downErr == errConnClosed) ? nil : io::ErrUnexpectedEOF;
wconn._mu.unlock(); wconn._mu.RUnlock();
} }
return E(err); return E(err);
} }
...@@ -268,19 +295,19 @@ error _Conn::__pin1(PinReq *req) { ...@@ -268,19 +295,19 @@ error _Conn::__pin1(PinReq *req) {
}); });
// XXX deadlock wrt Conn.resync which locks wconn.mu and does "watch" ? // XXX deadlock wrt Conn.resync which locks wconn.mu and does "watch" ?
wconn._mu.lock(); wconn._mu.RLock();
// XXX +incref f, so that simultaneous close does not remove f from wconn.filehTab ? // XXX +incref f, so that simultaneous close does not remove f from wconn.filehTab ?
// XXX or just make FileH.close lock f too to synchronize with pinner? // XXX or just make FileH.close lock f too to synchronize with pinner?
tie(f, ok) = wconn._filehTab.get_(req->foid); tie(f, ok) = wconn._filehTab.get_(req->foid);
if (!ok) { if (!ok) {
wconn._mu.unlock(); wconn._mu.RUnlock();
// why wcfs sent us this update? // why wcfs sent us this update?
return fmt::errorf("unexpected pin: f<%s> not watched", v(req->foid)); return fmt::errorf("unexpected pin: f<%s> not watched", v(req->foid));
} }
// XXX <- f._openReady ? // XXX <- f._openReady ?
wconn._mu.unlock(); // XXX maybe `f.mu.lock() -> wconn.mu.unlock()` to avoid race with FileH close? wconn._mu.RUnlock(); // XXX maybe `f.mu.lock() -> wconn.mu.unlock()` to avoid race with FileH close?
f->_mu.lock(); f->_mu.lock();
defer([&]() { defer([&]() {
f->_mu.unlock(); f->_mu.unlock();
...@@ -351,17 +378,17 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) { ...@@ -351,17 +378,17 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) {
xerr::Contextf E("%s: open f<%s>", v(wconn), v(foid)); xerr::Contextf E("%s: open f<%s>", v(wconn), v(foid));
wconn._mu.lock(); wconn._mu.Lock();
if (wconn._downErr != nil) { if (wconn._downErr != nil) {
wconn._mu.unlock(); wconn._mu.Unlock();
return make_pair(nil, E(wconn._downErr)); return make_pair(nil, E(wconn._downErr));
} }
FileH f; bool ok; FileH f; bool ok;
tie(f, ok) = wconn._filehTab.get_(foid); tie(f, ok) = wconn._filehTab.get_(foid);
if (ok) { if (ok) {
wconn._mu.unlock(); wconn._mu.Unlock();
f->_openReady.recv(); f->_openReady.recv();
if (f->_openErr != nil) if (f->_openErr != nil)
...@@ -383,13 +410,14 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) { ...@@ -383,13 +410,14 @@ pair<FileH, error> _Conn::open(zodb::Oid foid) {
wconn._filehTab[foid] = f; wconn._filehTab[foid] = f;
defer([&]() { defer([&]() {
if (!retok) { if (!retok) {
wconn._mu.lock(); wconn._mu.Lock();
// XXX assert filehTab[foid] == f
wconn._filehTab.erase(foid); wconn._filehTab.erase(foid);
wconn._mu.unlock(); wconn._mu.Unlock();
} }
f->_openReady.close(); f->_openReady.close();
}); });
wconn._mu.unlock(); wconn._mu.Unlock();
f->_openErr = f->_open(); f->_openErr = f->_open();
if (f->_openErr != nil) if (f->_openErr != nil)
...@@ -460,11 +488,11 @@ error _FileH::close() { ...@@ -460,11 +488,11 @@ error _FileH::close() {
// remove fileh from wconn._filehTab // remove fileh from wconn._filehTab
// fileh.close can be called several times and after first call another // fileh.close can be called several times and after first call another
// fileh could be opened for the same foid. Be careful not to erase it. // fileh could be opened for the same foid. Be careful not to erase it.
wconn->_mu.lock(); wconn->_mu.Lock();
// XXX decref open count // XXX decref open count
if (wconn->_filehTab.get(fileh.foid)._ptr() == &fileh) if (wconn->_filehTab.get(fileh.foid)._ptr() == &fileh)
wconn->_filehTab.erase(fileh.foid); wconn->_filehTab.erase(fileh.foid);
wconn->_mu.unlock(); wconn->_mu.Unlock();
return E(fileh._headf->close()); return E(fileh._headf->close());
} }
...@@ -609,9 +637,9 @@ error _Conn::resync(zodb::Tid at) { ...@@ -609,9 +637,9 @@ error _Conn::resync(zodb::Tid at) {
// files stays the same during whole resync. // files stays the same during whole resync.
bool atMuWLocked = true; bool atMuWLocked = true;
wconn._atMu.Lock(); wconn._atMu.Lock();
wconn._mu.lock(); wconn._mu.RLock();
defer([&]() { defer([&]() {
wconn._mu.unlock(); wconn._mu.RUnlock();
if (atMuWLocked) if (atMuWLocked)
wconn._atMu.Unlock(); wconn._atMu.Unlock();
else else
...@@ -629,7 +657,8 @@ error _Conn::resync(zodb::Tid at) { ...@@ -629,7 +657,8 @@ error _Conn::resync(zodb::Tid at) {
bool retok = false; bool retok = false;
defer([&]() { defer([&]() {
if (!retok) if (!retok)
panic("TODO: bring wconn + fileh + mmaps down on errror"); // XXX //panic("TODO: bring wconn + fileh + mmaps down on error"); // XXX
fprintf(stderr, "\n\nTODO: bring wconn + fileh + mmaps down on error\n\n\n");
}); });
// set new wconn.at early, so that e.g. Conn.open running simultaneously // set new wconn.at early, so that e.g. Conn.open running simultaneously
...@@ -692,7 +721,7 @@ error _Conn::resync(zodb::Tid at) { ...@@ -692,7 +721,7 @@ error _Conn::resync(zodb::Tid at) {
// Now other calls, e.g. Conn.open, can be running simultaneously to us, // Now other calls, e.g. Conn.open, can be running simultaneously to us,
// but since we already set wconn.at to new value it is ok. For example // but since we already set wconn.at to new value it is ok. For example
// Conn.open, for not-yet-opened file, will use new at to send "watch". // Conn.open, for not-yet-opened file, will use new at to send "watch".
// XXX ^^^ not possible since wconn._mu is locked ? // XXX ^^^ not possible since wconn._mu is locked ? -> no, possible, wconn._mu became RWMutex
// //
// XXX we are still holding wconn._mu locked, so wconn._filehTab is the // XXX we are still holding wconn._mu locked, so wconn._filehTab is the
// same as in previous pass above. // same as in previous pass above.
......
...@@ -180,7 +180,8 @@ struct _Conn : object { ...@@ -180,7 +180,8 @@ struct _Conn : object {
sync::RWMutex _atMu; sync::RWMutex _atMu;
zodb::Tid at; zodb::Tid at;
sync::Mutex _mu; // _atMu.W | _atMu.R + _mu // sync::Mutex _mu; // _atMu.W | _atMu.R + _mu
sync::RWMutex _mu; // _atMu.W | _atMu.R + _mu
error _downErr; // !nil if connection is closed or no longer operational error _downErr; // !nil if connection is closed or no longer operational
dict<zodb::Oid, FileH> _filehTab; // {} foid -> fileh dict<zodb::Oid, FileH> _filehTab; // {} foid -> fileh
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment