Commit 3f83469c authored by Kirill Smelkov's avatar Kirill Smelkov

X wcfs: client: Handle fork

Without special care a forked child may interfere in parent-wcfs
exchange via Python GC -> PyFileH.__del__ -> FileH.close -> message to
WCFS sent from the child. This actually happens for real when running
test.py/neo-wcfs because NEO test cluster spawns master and storage
nodes with just fork without exec.

-> detach from wcfs in child right after fork and deactivate all
mappings in order not to provide stale data. See top-level comments
added to wcfs/client/wcfs.cpp for details.
parent c2c35851
...@@ -29,7 +29,7 @@ from __future__ import print_function, absolute_import ...@@ -29,7 +29,7 @@ from __future__ import print_function, absolute_import
from golang import func, defer, error, b from golang import func, defer, error, b
from wendelin.bigfile.file_zodb import ZBigFile from wendelin.bigfile.file_zodb import ZBigFile
from wendelin.wcfs.wcfs_test import tDB, tAt from wendelin.wcfs.wcfs_test import tDB, tAt, timeout, waitfor_, eprint
from wendelin.wcfs import wcfs_test from wendelin.wcfs import wcfs_test
from wendelin.wcfs.internal.wcfs_test import read_mustfault from wendelin.wcfs.internal.wcfs_test import read_mustfault
from wendelin.wcfs.internal import mm from wendelin.wcfs.internal import mm
...@@ -37,6 +37,9 @@ from wendelin.wcfs.internal import mm ...@@ -37,6 +37,9 @@ from wendelin.wcfs.internal import mm
from pytest import raises from pytest import raises
from golang.golang_test import panics from golang.golang_test import panics
import os, multiprocessing, gc
# so that e.g. testdb is set up + ... # so that e.g. testdb is set up + ...
def setup_module(): wcfs_test.setup_module() def setup_module(): wcfs_test.setup_module()
def teardown_module(): wcfs_test.teardown_module() def teardown_module(): wcfs_test.teardown_module()
...@@ -275,6 +278,60 @@ def test_wcfs_client_down_efault(): ...@@ -275,6 +278,60 @@ def test_wcfs_client_down_efault():
with raises(error, match=".*: file already closed"): fh1.mmap(2, 3) # fh1 was explicitly closed ^^^ with raises(error, match=".*: file already closed"): fh1.mmap(2, 3) # fh1 was explicitly closed ^^^
# verify that on fork client turns all child's wcfs mappings to efault and
# detaches from wcfs. (else even automatic FileH.__del__ - caused by GC in child
# - can send message to wcfs server and this way break parent-wcfs exchange).
@func
def test_wcfs_client_afterfork():
t = tDB(); zf = t.zfile; at0=t.at0
defer(t.close)
# initial setup
at1 = t.commit(zf, {1:'b1', 3:'d1'})
wconn = t.wc.connect(at1)
defer(wconn.close)
fh = wconn.open(zf._p_oid); defer(fh.close)
m = fh.mmap(0, 4); tm = tMapping(t, m)
tm.assertBlk(0, '', {})
tm.assertBlk(1, 'b1', {})
tm.assertBlk(2, '', {})
tm.assertBlk(3, 'd1', {})
# fork child and verify that it does not interact with wcfs
def forkedchild():
tm.assertBlkFaults(0)
tm.assertBlkFaults(1)
tm.assertBlkFaults(2)
tm.assertBlkFaults(3)
fh.close() # must be noop in child
gc.collect()
os._exit(0) # NOTE not sys.exit not to execute deferred cleanup prepared by parent
p = multiprocessing.Process(target=forkedchild)
p.start()
if not waitfor_(timeout(), lambda: p.exitcode is not None):
eprint("\nC: child stuck")
eprint("-> kill it (pid %s) ...\n" % p.pid)
p.terminate()
p.join()
assert p.exitcode == 0
# make sure that parent can continue using wcfs normally
at2 = t.commit(zf, {1:'b2'})
tm.assertBlk(0, '', {})
tm.assertBlk(1, 'b1', {1:at1}) # pinned @at1
tm.assertBlk(2, '', {1:at1})
tm.assertBlk(3, 'd1', {1:at1})
wconn.resync(at2) # unpins 1 to @head
tm.assertBlk(0, '', {})
tm.assertBlk(1, 'b2', {})
tm.assertBlk(2, '', {})
tm.assertBlk(3, 'd1', {})
# TODO try to unit test at wcfs client level wcfs.Mapping with dirty RW page - # TODO try to unit test at wcfs client level wcfs.Mapping with dirty RW page -
# that it stays in sync with DB after dirty discard. # that it stays in sync with DB after dirty discard.
......
...@@ -183,6 +183,25 @@ ...@@ -183,6 +183,25 @@
// (*) see "Wcfs locking organization" in wcfs.go // (*) see "Wcfs locking organization" in wcfs.go
// (%) see related comment in Conn.__pin1 for details. // (%) see related comment in Conn.__pin1 for details.
// Handling of fork
//
// When a process calls fork, OS copies its memory and creates child process
// with only 1 thread. That child inherits file descriptors and memory mappings
// from parent. To correctly continue using Conn, FileH and Mappings, the child
// must recreate pinner thread and reconnect to wcfs via reopened watchlink.
// The reason here is that without reconnection - by using watchlink file
// descriptor inherited from parent - the child would interfere into
// parent-wcfs exchange and neither parent nor child could continue normal
// protocol communication with WCFS.
//
// For simplicity, since fork is seldomly used for things besides followup
// exec, wcfs client currently takes straightforward approach by disabling
// mappings and detaching from WCFS server in the child right after fork. This
// ensures that there is no interference into parent-wcfs exchange should child
// decide not to exec and to continue running in the forked thread. Without
// this protection the interference might come even automatically via e.g.
// Python GC -> PyFileH.__del__ -> FileH.close -> message to WCFS.
#include "wcfs_misc.h" #include "wcfs_misc.h"
#include "wcfs.h" #include "wcfs.h"
...@@ -301,6 +320,10 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) { ...@@ -301,6 +320,10 @@ pair<Conn, error> WCFS::connect(zodb::Tid at) {
wconn->at = at; wconn->at = at;
wconn->_wlink = wlink; wconn->_wlink = wlink;
os::RegisterAfterFork(newref(
static_cast<os::_IAfterFork*>( wconn._ptr() )
));
context::Context pinCtx; context::Context pinCtx;
tie(pinCtx, wconn->_pinCancel) = context::with_cancel(context::background()); tie(pinCtx, wconn->_pinCancel) = context::with_cancel(context::background());
wconn->_pinWG = sync::NewWorkGroup(pinCtx); wconn->_pinWG = sync::NewWorkGroup(pinCtx);
...@@ -317,6 +340,7 @@ static global<error> errConnClosed = errors::New("connection closed"); ...@@ -317,6 +340,7 @@ static global<error> errConnClosed = errors::New("connection closed");
// //
// opened fileh and mappings become invalid to use except close and unmap. // opened fileh and mappings become invalid to use except close and unmap.
error _Conn::close() { error _Conn::close() {
// NOTE keep in sync with Conn.afterFork
_Conn& wconn = *this; _Conn& wconn = *this;
// lock virtmem early. TODO more granular virtmem locking (see __pin1 for // lock virtmem early. TODO more granular virtmem locking (see __pin1 for
...@@ -409,9 +433,46 @@ error _Conn::close() { ...@@ -409,9 +433,46 @@ error _Conn::close() {
if (!errors::Is(err, context::canceled)) // canceled - ok if (!errors::Is(err, context::canceled)) // canceled - ok
reterr1(err); reterr1(err);
os::UnregisterAfterFork(newref(
static_cast<os::_IAfterFork*>( &wconn )
));
return E(eret); return E(eret);
} }
// afterFork detaches from wcfs in child process right after fork.
//
// opened fileh are closed abruptly without sending "bye" not to interfere into
// parent-wcfs exchange. Existing mappings become invalid to use.
void _Conn::afterFork() {
// NOTE keep in sync with Conn.close
_Conn& wconn = *this;
// ↓↓↓ parallels Conn::close, but without locking and exchange with wcfs.
//
// After fork in child we are the only thread that exists/runs.
// -> no need to lock anything; trying to use locks could even deadlock,
// because locks state is snapshotted from at fork time, when a lock could
// be already locked by some thread.
bool alreadyClosed = (wconn._downErr == errConnClosed);
if (alreadyClosed)
return;
// close all files and make mappings efault.
while (!wconn._filehTab.empty()) {
FileH f = wconn._filehTab.begin()->second;
// close f even if f->_state < _FileHOpened
// (in parent closure of opening-in-progress files is done by
// Conn::open, but in child we are the only one to release resources)
f->_afterFork();
}
// NOTE no need to wlink->close() - wlink handles afterFork by itself.
// NOTE no need to signal pinner, as fork does not clone the pinner into child.
}
// _pinner receives pin messages from wcfs and adjusts wconn file mappings. // _pinner receives pin messages from wcfs and adjusts wconn file mappings.
error _Conn::_pinner(context::Context ctx) { error _Conn::_pinner(context::Context ctx) {
Conn wconn = newref(this); // newref for go Conn wconn = newref(this); // newref for go
...@@ -944,6 +1005,7 @@ error _FileH::close() { ...@@ -944,6 +1005,7 @@ error _FileH::close() {
// - virt_lock // - virt_lock
// - wconn.atMu // - wconn.atMu
error _FileH::_closeLocked(bool force) { error _FileH::_closeLocked(bool force) {
// NOTE keep in sync with FileH._afterFork
_FileH& fileh = *this; _FileH& fileh = *this;
Conn wconn = fileh.wconn; Conn wconn = fileh.wconn;
...@@ -1018,6 +1080,40 @@ error _FileH::_closeLocked(bool force) { ...@@ -1018,6 +1080,40 @@ error _FileH::_closeLocked(bool force) {
return E(eret); return E(eret);
} }
// _afterFork is similar to _closeLocked and releases FileH resource and
// mappings right after fork.
void _FileH::_afterFork() {
// NOTE keep in sync with FileH._closeLocked
_FileH& fileh = *this;
Conn wconn = fileh.wconn;
// ↓↓↓ parallels FileH._closeLocked but without locking and wcfs exchange.
//
// There is no locking (see Conn::afterFork for why) and we shutdown file
// even if ._state == _FileHClosing, because that state was copied from
// parent and it is inside parent where it is another thread that is
// currently closing *parent's* FileH.
if (fileh._state == _FileHClosed) // NOTE _not_ >= _FileHClosing
return;
// don't send to wcfs "stop watch f" not to disrupt parent-wcfs exchange.
// just close the file.
if (wconn->_filehTab.get(fileh.foid)._ptr() != &fileh)
panic("BUG: fileh.closeAfterFork: wconn.filehTab[fileh.foid] != fileh");
wconn->_filehTab.erase(fileh.foid);
fileh._headf->close(); // ignore err
// change all fileh.mmaps to cause EFAULT on access
for (auto mmap : fileh._mmaps) {
mmap->__remmapAsEfault(); // ignore err
}
// done
fileh._state = _FileHClosed;
}
// mmap creates file mapping representing file[blk_start +blk_len) data as of wconn.at database state. // mmap creates file mapping representing file[blk_start +blk_len) data as of wconn.at database state.
// //
// If vma != nil, created mapping is associated with that vma of user-space virtual memory manager: // If vma != nil, created mapping is associated with that vma of user-space virtual memory manager:
...@@ -1427,6 +1523,9 @@ string _Mapping::String() const { ...@@ -1427,6 +1523,9 @@ string _Mapping::String() const {
_Conn::_Conn() {} _Conn::_Conn() {}
_Conn::~_Conn() {} _Conn::~_Conn() {}
void _Conn::incref() {
object::incref();
}
void _Conn::decref() { void _Conn::decref() {
if (__decref()) if (__decref())
delete this; delete this;
......
...@@ -169,7 +169,7 @@ struct WCFS { ...@@ -169,7 +169,7 @@ struct WCFS {
// Conn logically mirrors ZODB.Connection . // Conn logically mirrors ZODB.Connection .
// It is safe to use Conn from multiple threads simultaneously. // It is safe to use Conn from multiple threads simultaneously.
typedef refptr<struct _Conn> Conn; typedef refptr<struct _Conn> Conn;
struct _Conn : object { struct _Conn : os::_IAfterFork, object {
WCFS *_wc; WCFS *_wc;
WatchLink _wlink; // watch/receive pins for mappings created under this conn WatchLink _wlink; // watch/receive pins for mappings created under this conn
...@@ -193,6 +193,7 @@ private: ...@@ -193,6 +193,7 @@ private:
~_Conn(); ~_Conn();
friend pair<Conn, error> WCFS::connect(zodb::Tid at); friend pair<Conn, error> WCFS::connect(zodb::Tid at);
public: public:
void incref();
void decref(); void decref();
public: public:
...@@ -207,6 +208,8 @@ private: ...@@ -207,6 +208,8 @@ private:
error __pinner(context::Context ctx); error __pinner(context::Context ctx);
error _pin1(PinReq *req); error _pin1(PinReq *req);
error __pin1(PinReq *req); error __pin1(PinReq *req);
void afterFork();
}; };
// FileH represent isolated file view under Conn. // FileH represent isolated file view under Conn.
...@@ -264,6 +267,7 @@ public: ...@@ -264,6 +267,7 @@ public:
error _open(); error _open();
error _closeLocked(bool force); error _closeLocked(bool force);
void _afterFork();
}; };
// Mapping represents one memory mapping of FileH. // Mapping represents one memory mapping of FileH.
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <golang/errors.h> #include <golang/errors.h>
#include <golang/fmt.h> #include <golang/fmt.h>
#include <golang/io.h> #include <golang/io.h>
#include <golang/sync.h>
using namespace golang; using namespace golang;
#include <inttypes.h> #include <inttypes.h>
...@@ -32,6 +33,7 @@ using namespace golang; ...@@ -32,6 +33,7 @@ using namespace golang;
#include <unistd.h> #include <unistd.h>
#include <sys/mman.h> #include <sys/mman.h>
#include <algorithm>
#include <memory> #include <memory>
// golang:: // golang::
...@@ -134,6 +136,59 @@ static error _pathError(const char *op, const string &path, int syserr) { ...@@ -134,6 +136,59 @@ static error _pathError(const char *op, const string &path, int syserr) {
} }
// afterfork
static sync::Mutex _afterForkMu;
static bool _afterForkInit;
static vector<IAfterFork> _afterForkList;
// _runAfterFork runs handlers registered by RegisterAfterFork.
static void _runAfterFork() {
// we were just forked: This is child process and there is only 1 thread.
// The state of memory was copied from parent.
// There is no other mutators except us.
// -> go through _afterForkList *without* locking.
for (auto obj : _afterForkList) {
obj->afterFork();
}
// reset _afterFork* state because child could want to fork again
new (&_afterForkMu) sync::Mutex;
_afterForkInit = false;
_afterForkList.clear();
}
void RegisterAfterFork(IAfterFork obj) {
_afterForkMu.lock();
defer([&]() {
_afterForkMu.unlock();
});
if (!_afterForkInit) {
int e = pthread_atfork(/*prepare=*/nil, /*parent=*/nil, /*child=*/_runAfterFork);
if (e != 0) {
string estr = fmt::sprintf("pthread_atfork: %s", v(_sysErrString(e)));
panic(v(estr));
}
_afterForkInit = true;
}
_afterForkList.push_back(obj);
}
void UnregisterAfterFork(IAfterFork obj) {
_afterForkMu.lock();
defer([&]() {
_afterForkMu.unlock();
});
// _afterForkList.remove(obj)
_afterForkList.erase(
std::remove(_afterForkList.begin(), _afterForkList.end(), obj),
_afterForkList.end());
}
// _sysErrString returns string corresponding to system error syserr. // _sysErrString returns string corresponding to system error syserr.
static string _sysErrString(int syserr) { static string _sysErrString(int syserr) {
char ebuf[128]; char ebuf[128];
......
...@@ -107,6 +107,25 @@ tuple<File, error> open(const string &path, int flags = O_RDONLY, ...@@ -107,6 +107,25 @@ tuple<File, error> open(const string &path, int flags = O_RDONLY,
S_IRGRP | S_IWGRP | S_IXGRP | S_IRGRP | S_IWGRP | S_IXGRP |
S_IROTH | S_IWOTH | S_IXOTH); S_IROTH | S_IWOTH | S_IXOTH);
// afterfork
// IAfterFork is the interface that objects must implement to be notified after fork.
typedef refptr<struct _IAfterFork> IAfterFork;
struct _IAfterFork : public _interface {
// afterFork is called in just forked child process for objects that
// were previously registered in parent via RegisterAfterFork.
virtual void afterFork() = 0;
};
// RegisterAfterFork registers obj so that obj.afterFork is run after fork in
// the child process.
void RegisterAfterFork(IAfterFork obj);
// UnregisterAfterFork undoes RegisterAfterFork.
// It is noop if obj was not registered.
void UnregisterAfterFork(IAfterFork obj);
} // os:: } // os::
// mm:: // mm::
......
...@@ -65,6 +65,10 @@ pair<WatchLink, error> WCFS::_openwatch() { ...@@ -65,6 +65,10 @@ pair<WatchLink, error> WCFS::_openwatch() {
wlink->rx_eof = makechan<structZ>(); wlink->rx_eof = makechan<structZ>();
os::RegisterAfterFork(newref(
static_cast<os::_IAfterFork*>( wlink._ptr() )
));
context::Context serveCtx; context::Context serveCtx;
tie(serveCtx, wlink->_serveCancel) = context::with_cancel(context::background()); tie(serveCtx, wlink->_serveCancel) = context::with_cancel(context::background());
wlink->_serveWG = sync::NewWorkGroup(serveCtx); wlink->_serveWG = sync::NewWorkGroup(serveCtx);
...@@ -98,9 +102,24 @@ error _WatchLink::close() { ...@@ -98,9 +102,24 @@ error _WatchLink::close() {
if (err == nil) if (err == nil)
err = err3; err = err3;
os::UnregisterAfterFork(newref(
static_cast<os::_IAfterFork*>( &wlink )
));
return E(err); return E(err);
} }
// afterFork detaches from wcfs in child process right after fork.
void _WatchLink::afterFork() {
_WatchLink& wlink = *this;
// in child right after fork we are the only thread to run; in particular
// _serveRX is not running. Just release the file handle, that fork
// duplicated, to make sure that child cannot send anything to wcfs and
// interfere into parent-wcfs exchange.
wlink._f->close(); // ignore err
}
// closeWrite closes send half of the link. // closeWrite closes send half of the link.
error _WatchLink::closeWrite() { error _WatchLink::closeWrite() {
_WatchLink& wlink = *this; _WatchLink& wlink = *this;
...@@ -487,6 +506,9 @@ string rxPkt::to_string() const { ...@@ -487,6 +506,9 @@ string rxPkt::to_string() const {
_WatchLink::_WatchLink() {} _WatchLink::_WatchLink() {}
_WatchLink::~_WatchLink() {} _WatchLink::~_WatchLink() {}
void _WatchLink::incref() {
object::incref();
}
void _WatchLink::decref() { void _WatchLink::decref() {
if (__decref()) if (__decref())
delete this; delete this;
......
...@@ -70,7 +70,7 @@ static_assert(sizeof(rxPkt) == 256, "rxPkt miscompiled"); // NOTE 128 is too low ...@@ -70,7 +70,7 @@ static_assert(sizeof(rxPkt) == 256, "rxPkt miscompiled"); // NOTE 128 is too low
// //
// It is safe to use WatchLink from multiple threads simultaneously. // It is safe to use WatchLink from multiple threads simultaneously.
typedef refptr<class _WatchLink> WatchLink; typedef refptr<class _WatchLink> WatchLink;
class _WatchLink : public object { class _WatchLink : public os::_IAfterFork, object {
WCFS *_wc; WCFS *_wc;
os::File _f; // head/watch file handle os::File _f; // head/watch file handle
string _rxbuf; // buffer for data already read from _f string _rxbuf; // buffer for data already read from _f
...@@ -102,6 +102,7 @@ private: ...@@ -102,6 +102,7 @@ private:
~_WatchLink(); ~_WatchLink();
friend pair<WatchLink, error> WCFS::_openwatch(); friend pair<WatchLink, error> WCFS::_openwatch();
public: public:
void incref();
void decref(); void decref();
public: public:
...@@ -122,6 +123,8 @@ private: ...@@ -122,6 +123,8 @@ private:
StreamID _nextReqID(); StreamID _nextReqID();
tuple<chan<rxPkt>, error> _sendReq(context::Context ctx, StreamID stream, const string &req); tuple<chan<rxPkt>, error> _sendReq(context::Context ctx, StreamID stream, const string &req);
void afterFork();
friend error _twlinkwrite(WatchLink wlink, const string &pkt); friend error _twlinkwrite(WatchLink wlink, const string &pkt);
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment