Commit f4a422b7 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 02470041
...@@ -105,8 +105,25 @@ class WatchLink(object): ...@@ -105,8 +105,25 @@ class WatchLink(object):
wlink._wrx = os.fdopen(wh, 'rb') wlink._wrx = os.fdopen(wh, 'rb')
wlink._wtx = os.fdopen(wh2, 'wb') wlink._wtx = os.fdopen(wh2, 'wb')
# XXX vvv -> test only?
wlink.rx_eof = chan() # becomes ready when wcfs closes its tx side
wlink.fatalv = [] # fatal messages received from wcfs
# XXX ... # inv.protocol message IO
wlink._acceptq = chan() # (stream, msg) server originated messages go here
wlink._rxmu = threading.Lock()
wlink._rxtab = {} # stream -> rxq server replies go via here
wlink._accepted = set() # of stream streams we accepted but did not replied yet
wlink._txmu = threading.Lock() # serializes writes
wlink._txclosed = False
serveCtx, wlink._serveCancel = context.with_cancel(context.background())
wlink._serveWG = sync.WorkGroup(serveCtx)
wlink._serveWG.go(wlink._serveRX)
# this tWatchLink currently watches the following files at particular state.
wlink._watching = {} # {} foid -> tWatch
# ---- WatchLink message IO ---- # ---- WatchLink message IO ----
......
...@@ -725,41 +725,7 @@ class tWatchLink(wcfs.WatchLink): ...@@ -725,41 +725,7 @@ class tWatchLink(wcfs.WatchLink):
def __init__(t, tdb): def __init__(t, tdb):
super(tWatchLink, t).__init__(tdb.wc) super(tWatchLink, t).__init__(tdb.wc)
t.tdb = tdb t.tdb = tdb
# head/watch handle.
#
# python/stdio lock file object on read/write, however we need both
# read and write to be working simultaneously.
# -> use 2 separate file objects for rx and tx.
#
# fdopen takes ownership of file descriptor and closes it when file
# object is closed -> dup fd so that each file object has its own fd.
wh = os.open(tdb.wc._path("head/watch"), os.O_RDWR)
wh2 = os.dup(wh)
t._wrx = os.fdopen(wh, 'rb')
t._wtx = os.fdopen(wh2, 'wb')
t.rx_eof = chan() # becomes ready when wcfs closes its tx side
t.fatalv = [] # fatal messages received from wcfs
# inv.protocol message IO
t._acceptq = chan() # (stream, msg) server originated messages go here
t._rxmu = threading.Lock()
t._rxtab = {} # stream -> rxq server replies go via here
t._accepted = set() # of stream streams we accepted but did not replied yet
t._txmu = threading.Lock() # serializes writes
t._txclosed = False
serveCtx, t._serveCancel = context.with_cancel(context.background())
t._serveWG = sync.WorkGroup(serveCtx)
t._serveWG.go(t._serveRX)
# this tWatchLink currently watches the following files at particular state.
t._watching = {} # {} foid -> tWatch
tdb._wlinks.add(t) tdb._wlinks.add(t)
def _closeTX(t): def _closeTX(t):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment