Commit 0327a441 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 62016a67
...@@ -49,23 +49,20 @@ from zodbtools.util import ashex ...@@ -49,23 +49,20 @@ from zodbtools.util import ashex
# WCFS represents connection to wcfs server. # WCFS represents connection to wcfs server.
#
# It has to be created with join.
# Only 1 connection is maintained for one file server. # Only 1 connection is maintained for one file server.
class WCFS(object): class WCFS(object):
# .mountpoint path to wcfs mountpoint # .mountpoint path to wcfs mountpoint
# ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly) # ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly)
# ._njoin this connection was returned for so many joins
# XXX for-testing only? # XXX for-testing only?
# ._proc wcfs process if it was opened by this WCFS | None # ._proc wcfs process if it was opened by this WCFS | None
pass
def __init__(self, mountpoint, fwcfs, proc):
self.mountpoint = mountpoint
self._fwcfs = fwcfs
self._proc = proc
def close(self):
# XXX unmount wcfs as well?
self._fwcfs.close()
"""
# open creates wcfs file handle, which can be mmaped to give data of ZBigFile. # open creates wcfs file handle, which can be mmaped to give data of ZBigFile.
# #
# XXX more text # XXX more text
...@@ -106,6 +103,7 @@ class FileH(object): ...@@ -106,6 +103,7 @@ class FileH(object):
def mmap(self, pgoffset, pglen): def mmap(self, pgoffset, pglen):
return _VMA(self.fd, pgoffset, pglen, pagesize, mmap.PROT_READ) return _VMA(self.fd, pgoffset, pglen, pagesize, mmap.PROT_READ)
"""
...@@ -113,39 +111,24 @@ class FileH(object): ...@@ -113,39 +111,24 @@ class FileH(object):
# ---- join/run wcfs ---- # ---- join/run wcfs ----
# serve starts and runs wcfs server for ZODB @ zurl. _wcmu = threading.Lock()
# _wcregistry = {} # mntpt -> WCFS
# it mounts wcfs at a location that is with 1-1 correspondence with zurl.
# it then waits for wcfs to exit (either due to unmount or an error). @func(WCFS)
# def __init__(wc, mountpoint, fwcfs, proc):
# it is an error if wcfs was already started. wc.mountpoint = mountpoint
# wc._fwcfs = fwcfs
# XXX optv wc._njoin = 1
# if exec_ is True, wcfs is not spawned, but executed into. wc._proc = proc
#
# serve(zurl, exec_=False). @func(WCFS)
def serve(zurl, optv, exec_=False): def close(wc):
mntpt = _mntpt_4zurl(zurl) with _wcmu:
wc._njoin -= 1
# try opening .wcfs - it is an error if we can do it. if wc._njoin == 0:
# XXX -> option to wcfs itself? # XXX unmount wcfs as well?
try: wc._fwcfs.close()
f = open(mntpt + "/.wcfs/zurl") del _wcregistry[wc.mountpoint]
except IOError as e:
if e.errno != ENOENT:
raise
else:
f.close()
raise RuntimeError("wcfs: start %s: already started" % zurl)
# seems to be ok to start
# XXX race window if something starts after ^^^ check
argv = [_wcfs_exe()] + list(optv) + [zurl, mntpt]
if not exec_:
subprocess.check_call(argv, close_fds=True)
else:
os.execv(argv[0], argv)
# _default_autostart returns default autostart setting for join. # _default_autostart returns default autostart setting for join.
# #
...@@ -162,36 +145,39 @@ def _default_autostart(): ...@@ -162,36 +145,39 @@ def _default_autostart():
# #
# If wcfs for that zurl was already started, join connects to it. # If wcfs for that zurl was already started, join connects to it.
# Otherwise it starts wcfs for zurl if autostart is True. # Otherwise it starts wcfs for zurl if autostart is True.
# def join(zurl, autostart=_default_autostart()): # -> WCFS
# If shared is True - a shared connection is returned - one that will be also XXX -> always this way
# returned for following join(shared=True) requests with the same zurl.
def join(zurl, autostart=_default_autostart(), shared=False): # -> Conn
# XXX implement shared
mntpt = _mntpt_4zurl(zurl) mntpt = _mntpt_4zurl(zurl)
with _wcmu:
# try opening .wcfs - if we succeed - it is already mounted. # check if we already have connection to wcfs server from this process
# XXX -> wcfs itself? wc = _wcregistry.get(mntpt)
try: if wc is not None:
f = open(mntpt + "/.wcfs/zurl") wc._njoin += 1
except IOError as e: return wc
if e.errno != ENOENT:
raise # no. try opening .wcfs - if we succeed - wcfs is already mounted.
else: try:
# already have it f = open(mntpt + "/.wcfs/zurl")
return WCFS(mntpt, f, None) except IOError as e:
if e.errno != ENOENT:
if not autostart: raise
raise RuntimeError("wcfs: join %s: server not started" % zurl) else:
# already have it
# start wcfs with telling it to automatically exit when there is no client activity. wc = WCFS(mntpt, f, None)
optv_extra = os.environ.get("WENDELIN_CORE_WCFS_OPTIONS", "").split() _wcregistry[mntpt] = wc
return _start(zurl, "-autoexit", *optv_extra) return wc
if not autostart:
raise RuntimeError("wcfs: join %s: server not started" % zurl)
# start wcfs with telling it to automatically exit when there is no client activity.
optv_extra = os.environ.get("WENDELIN_CORE_WCFS_OPTIONS", "").split()
return _start(zurl, "-autoexit", *optv_extra)
# _start starts wcfs server for ZODB @ zurl. # _start starts wcfs server for ZODB @ zurl.
# #
# optv can be optionally given to pass flags to wcfs. # optv can be optionally given to pass flags to wcfs.
# called under _wcmu
def _start(zurl, *optv): # -> WCFS def _start(zurl, *optv): # -> WCFS
mntpt = _mntpt_4zurl(zurl) mntpt = _mntpt_4zurl(zurl)
log.info("wcfs: starting for %s ...", zurl) log.info("wcfs: starting for %s ...", zurl)
...@@ -255,9 +241,14 @@ def _start(zurl, *optv): # -> WCFS ...@@ -255,9 +241,14 @@ def _start(zurl, *optv): # -> WCFS
wg.go(_) wg.go(_)
wg.wait() wg.wait()
assert mntpt not in _wcregistry
_wcregistry[mntpt] = wc
return wc return wc
# ---- misc ----
# _wcfs_exe returns path to wcfs executable. # _wcfs_exe returns path to wcfs executable.
def _wcfs_exe(): def _wcfs_exe():
return '%s/wcfs' % dirname(__file__) return '%s/wcfs' % dirname(__file__)
...@@ -301,6 +292,40 @@ def _mkdir_p(path): ...@@ -301,6 +292,40 @@ def _mkdir_p(path):
raise raise
# serve starts and runs wcfs server for ZODB @ zurl.
#
# it mounts wcfs at a location that is with 1-1 correspondence with zurl.
# it then waits for wcfs to exit (either due to unmount or an error).
#
# it is an error if wcfs was already started.
#
# XXX optv
# if exec_ is True, wcfs is not spawned, but executed into.
#
# serve(zurl, exec_=False).
def serve(zurl, optv, exec_=False):
mntpt = _mntpt_4zurl(zurl)
# try opening .wcfs - it is an error if we can do it.
# XXX -> option to wcfs itself?
try:
f = open(mntpt + "/.wcfs/zurl")
except IOError as e:
if e.errno != ENOENT:
raise
else:
f.close()
raise RuntimeError("wcfs: start %s: already started" % zurl)
# seems to be ok to start
# XXX race window if something starts after ^^^ check
argv = [_wcfs_exe()] + list(optv) + [zurl, mntpt]
if not exec_:
subprocess.check_call(argv, close_fds=True)
else:
os.execv(argv[0], argv)
# if called as main just -> serve() # if called as main just -> serve()
......
...@@ -114,16 +114,23 @@ def test_join(): ...@@ -114,16 +114,23 @@ def test_join():
with raises(RuntimeError, match="wcfs: join .*: server not started"): with raises(RuntimeError, match="wcfs: join .*: server not started"):
wcfs.join(zurl, autostart=False) wcfs.join(zurl, autostart=False)
assert wcfs._wcregistry == {}
def _():
assert wcfs._wcregistry == {}
defer(_)
wc = wcfs._start(zurl) wc = wcfs._start(zurl)
defer(wc.close) defer(wc.close)
assert wc.mountpoint == testmntpt assert wc.mountpoint == testmntpt
assert wc._njoin == 1
assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl
assert os.path.isdir(wc.mountpoint + "/head") assert os.path.isdir(wc.mountpoint + "/head")
assert os.path.isdir(wc.mountpoint + "/head/bigfile") assert os.path.isdir(wc.mountpoint + "/head/bigfile")
wc2 = wcfs.join(zurl, autostart=False) wc2 = wcfs.join(zurl, autostart=False)
defer(wc2.close) defer(wc2.close)
assert wc2.mountpoint == wc.mountpoint assert wc2 is wc
assert wc._njoin == 2
# test that join(autostart=y) works. # test that join(autostart=y) works.
@func @func
...@@ -132,9 +139,15 @@ def test_join_autostart(): ...@@ -132,9 +139,15 @@ def test_join_autostart():
with raises(RuntimeError, match="wcfs: join .*: server not started"): with raises(RuntimeError, match="wcfs: join .*: server not started"):
wcfs.join(zurl, autostart=False) wcfs.join(zurl, autostart=False)
assert wcfs._wcregistry == {}
def _():
assert wcfs._wcregistry == {}
defer(_)
wc = wcfs.join(zurl, autostart=True) wc = wcfs.join(zurl, autostart=True)
defer(wc.close) defer(wc.close)
assert wc.mountpoint == testmntpt assert wc.mountpoint == testmntpt
assert wc._njoin == 1
assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl
assert os.path.isdir(wc.mountpoint + "/head") assert os.path.isdir(wc.mountpoint + "/head")
assert os.path.isdir(wc.mountpoint + "/head/bigfile") assert os.path.isdir(wc.mountpoint + "/head/bigfile")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment