Commit ac8296b6 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e62934cf
...@@ -28,8 +28,11 @@ import logging as log ...@@ -28,8 +28,11 @@ import logging as log
from os.path import dirname from os.path import dirname
from golang import go, chan, select, default from golang import go, chan, select, default
from ZODB.FileStorage import FileStorage
# WCFS represents connection to wcfs server. # WCFS represents connection to wcfs server.
# XXX name -> Connection ?
class WCFS(object): class WCFS(object):
# XXX + .zurl? # XXX + .zurl?
# .mountpoint path to mountpoint # .mountpoint path to mountpoint
...@@ -40,15 +43,17 @@ class WCFS(object): ...@@ -40,15 +43,17 @@ class WCFS(object):
# XXX open a file to keep the server from going away (at least cleanly) # XXX open a file to keep the server from going away (at least cleanly)
# run starts and runs wcfs server for ZODB @ zurl. # serve starts and runs wcfs server for ZODB @ zurl.
# #
# it mounts wcfs at a location that is with 1-1 corresponence with zurl. # it mounts wcfs at a location that is with 1-1 corresponence with zurl.
# it then waits for wcfs to exit (either due to unmount or an error). # it then waits for wcfs to exit (either due to unmount or an error).
# #
# it is an error if wcfs was already started. # it is an error if wcfs was already started.
# #
# run(zurl). # if exec_ is True, wcfs is not spawned, but execed to.
def run(zurl): #
# serve(zurl, exec_=False).
def serve(zurl, exec_=False):
mntpt = _mntpt_4zurl(zurl) mntpt = _mntpt_4zurl(zurl)
# try opening .wcfs - it is an error if we can do it. # try opening .wcfs - it is an error if we can do it.
...@@ -64,7 +69,11 @@ def run(zurl): ...@@ -64,7 +69,11 @@ def run(zurl):
# seems to be ok to start # seems to be ok to start
# XXX race window if something starts after ^^^ check # XXX race window if something starts after ^^^ check
subprocess.call([_wcfs_exe(), zurl, mntpt], close_fds=True) argv = [_wcfs_exe(), zurl, mntpt]
if not exec_:
subprocess.check_call(argv, close_fds=True)
else:
os.execv(argv[0], argv)
# join connects to wcfs server for ZODB @ zurl. # join connects to wcfs server for ZODB @ zurl.
...@@ -92,66 +101,86 @@ def join(zurl, autostart=None): ...@@ -92,66 +101,86 @@ def join(zurl, autostart=None):
if not autostart: if not autostart:
raise RuntimeError("wcfs: join %s: server not started" % zurl) raise RuntimeError("wcfs: join %s: server not started" % zurl)
return _start(zurl)
# XXX doc -> WCFS
def _start(zurl):
mntpt = _mntpt_4zurl(zurl)
log.info("wcfs: starting for %s ...", zurl) log.info("wcfs: starting for %s ...", zurl)
# cancelch cancels wcfs server running (and waitmounted in initialization phase) cancel = chan() # cancels wcfs server running (and waitmounted in initialization phase)
cancelch = chan() startedok = chan() # indicates to wcfs server that whole startup was ok
# wcfs spawns and monitors wcfs server. it is running until either wcfs # spawn spawns and monitors wcfs server. it is running until either wcfs
# server terminates or cancel. # server terminates, or cancel or startedok are ready.
ewcfs = chan(1) # err | None ewcfs = chan(1) # err | None
def wcfs(): def spawn():
err = None err = None
try: try:
p = subprocess.Popen([_wcfs_exe(), zurl, mntpt], close_fds=True) p = subprocess.Popen([_wcfs_exe(), zurl, mntpt], close_fds=True)
while 1: while 1:
ret = p.poll() ret = p.poll()
if ret is not None: if ret is not None:
err = "wcfs: exited with %s" % ret err = "spawn: exited with %s" % ret
break break
_, _rx = select( _, _rx = select(
cancelch.recv, # 0 cancel.recv, # 0
default, # 1 startedok.recv, # 1
default, # 2
) )
if _ == 0: if _ == 0:
p.terminate() p.terminate()
break break
time.sleep(1) if _ == 1:
# startup was ok - don't monitor spawned wcfs anylonger
break
time.sleep(0.1)
except Exception as e: except Exception as e:
log.exception("wcfs server") log.exception("wcfs server")
err = "wcfs: %s" % e # XXX wrap with errctx err = "spawn: %s" % e # XXX wrap with errctx
ewcfs.send(err) ewcfs.send(err)
# waitmounted waits till wcfs mount is ready. # waitmounted waits till wcfs mount is ready.
mounted = chan(1) # file | err mounted = chan(1) # file | err
def waitmounted(): def waitmounted():
err = None res = None
# XXX try/except + errctx # XXX errctx
while 1: try:
if os.path.exists("%s/.wcfs" % mntpt): while 1:
break try:
f = open("%s/.wcfs" % mntpt)
except IOError as e:
if e.errno != errno.ENOENT:
raise
else:
res = f
break
_, _rx = select(
cancelch.recv, # 0
default, # 1
)
if _ == 0:
err = "waitmounted: cancel"
break
time.sleep(1) _, _rx = select(
cancel.recv, # 0
default, # 1
)
if _ == 0:
res = "waitmounted: cancel"
break
time.sleep(0.1)
mounted.send(err) except Exception as e:
res = "waitmounted: %s" % e # XXX errctx
mounted.send(res)
# spawn wcfs and wait till it is mounted. # spawn wcfs and wait till it is mounted.
go(wcfs) go(spawn)
go(waitmounted) go(waitmounted)
_, _rx = select( _, _rx = select(
...@@ -160,20 +189,23 @@ def join(zurl, autostart=None): ...@@ -160,20 +189,23 @@ def join(zurl, autostart=None):
) )
if _ == 0: if _ == 0:
# wcfs error # wcfs error
cancelch.close() err = _rx
raise RuntimeError(_rx)
if _ == 1: if _ == 1:
if not isinstance(_rx, file): if not isinstance(_rx, file):
err = _rx # mounted ok - return WCFS object
cancelch.close() f = _rx
raise RuntimeError(err) startedok.close()
return WCFS(mntpt, f)
# waitmounted error
err = _rx
cancel.close()
raise RuntimeError("wcfs: start: %s" % err) # XXX errctx
# mounted ok - construct WCFS object
# XXX do we need to remember cancelch in WCFS and close it upon leaving?
f = _rx
return WCFS(mntpt, f)
...@@ -210,6 +242,6 @@ def _zstor_2zurl(zstor): ...@@ -210,6 +242,6 @@ def _zstor_2zurl(zstor):
# if called as main just -> run(argv[1]) # if called as main just -> serve(argv[1])
def main(): def main():
run(sys.argv[1]) serve(sys.argv[1], exec_=True)
...@@ -19,7 +19,9 @@ ...@@ -19,7 +19,9 @@
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
from wendelin.lib.testing import getTestDB from wendelin.lib.testing import getTestDB
from wendelin import wcfs
from golang import go, chan from golang import go, chan
from pytest import raises
testdb = None testdb = None
def setup_module(): def setup_module():
...@@ -32,9 +34,17 @@ def teardown_module(): ...@@ -32,9 +34,17 @@ def teardown_module():
def test_join(): def test_join():
z = testdb.getZODBStorage() zstor = testdb.getZODBStorage()
1/0 zurl = wcfs._zstor_2zurl(zstor)
with raises(RuntimeError, match="wcfs: join .*: server not started"):
wcfs.join(zurl, autostart=False)
wc = wcfs._start(zurl)
assert isinstance(wcfs.WCFS)
#wc = wcfs.join(zurl, autostart=False)
def test_join_autostart(): def test_join_autostart():
# XXX # XXX
pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment