Commit aeb11950 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d87991ec
...@@ -146,6 +146,8 @@ def close(wconn): ...@@ -146,6 +146,8 @@ def close(wconn):
f.headf.close() f.headf.close()
f.headf = None f.headf = None
# XXX stop watching f
wconn._filetab = None wconn._filetab = None
...@@ -197,6 +199,8 @@ def mmap(wconn, foid, blk_start, blk_len): # -> Mapping ...@@ -197,6 +199,8 @@ def mmap(wconn, foid, blk_start, blk_len): # -> Mapping
f.mmaps = [] f.mmaps = []
wconn._filetab[foid] = f wconn._filetab[foid] = f
# XXX start watching f
# XXX relock wconn -> f ? # XXX relock wconn -> f ?
# create memory with head/f mapping and applied pins # create memory with head/f mapping and applied pins
...@@ -260,8 +264,9 @@ def unmap(mmap): ...@@ -260,8 +264,9 @@ def unmap(mmap):
# WatchLink represents /head/watch link opened on wcfs. # WatchLink represents /head/watch link opened on wcfs.
# #
# .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages. # .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages.
# .close() closes the link.
# #
# XXX safe/not-safe to access from multiple threads? # It is safe to use WatchLink from multiple threads simultaneously.
class WatchLink(object): class WatchLink(object):
def __init__(wlink, wc): def __init__(wlink, wc):
...@@ -290,6 +295,7 @@ class WatchLink(object): ...@@ -290,6 +295,7 @@ class WatchLink(object):
wlink._rxtab = {} # stream -> rxq server replies go via here wlink._rxtab = {} # stream -> rxq server replies go via here
wlink._accepted = set() # of stream streams we accepted but did not replied yet wlink._accepted = set() # of stream streams we accepted but did not replied yet
wlink._req_next = 1 # stream ID for next client-originated request XXX -> atomic
wlink._txmu = threading.Lock() # serializes writes wlink._txmu = threading.Lock() # serializes writes
wlink._txclosed = False wlink._txclosed = False
...@@ -298,7 +304,7 @@ class WatchLink(object): ...@@ -298,7 +304,7 @@ class WatchLink(object):
wlink._serveWG.go(wlink._serveRX) wlink._serveWG.go(wlink._serveRX)
# this tWatchLink currently watches the following files at particular state. # this tWatchLink currently watches the following files at particular state.
# XXX back -> tWatchLink ? # XXX test only: back -> tWatchLink ?
wlink._watching = {} # {} foid -> tWatch wlink._watching = {} # {} foid -> tWatch
...@@ -316,6 +322,7 @@ class WatchLink(object): ...@@ -316,6 +322,7 @@ class WatchLink(object):
wlink._wtx.close() wlink._wtx.close()
wlink._txclosed = True wlink._txclosed = True
# close closes the link.
def close(wlink): def close(wlink):
wlink._closeTX() wlink._closeTX()
wlink._serveCancel() wlink._serveCancel()
...@@ -410,8 +417,6 @@ class WatchLink(object): ...@@ -410,8 +417,6 @@ class WatchLink(object):
wlink._wtx.flush() wlink._wtx.flush()
# sendReq sends client -> server request and returns server reply. # sendReq sends client -> server request and returns server reply.
#
# only 1 sendReq must be used at a time. # XXX relax?
def sendReq(wlink, ctx, req): # -> reply | None when EOF def sendReq(wlink, ctx, req): # -> reply | None when EOF
rxq = wlink._sendReq(ctx, req) rxq = wlink._sendReq(ctx, req)
...@@ -424,7 +429,9 @@ class WatchLink(object): ...@@ -424,7 +429,9 @@ class WatchLink(object):
return _rx return _rx
def _sendReq(wlink, ctx, req): # -> rxq def _sendReq(wlink, ctx, req): # -> rxq
stream = 1 # XXX -> dynamic with wlink._txmu: # XXX -> atomic (currently uses arbitrary lock)
stream = wlink._req_next
wlink._req_next = (wlink._req_next + 2) & ((1<<64)-1)
rxq = chan() rxq = chan()
with wlink._rxmu: with wlink._rxmu:
...@@ -435,8 +442,6 @@ class WatchLink(object): ...@@ -435,8 +442,6 @@ class WatchLink(object):
return rxq return rxq
# recvReq receives client <- server request. # recvReq receives client <- server request.
#
# multiple recvReq could be used at a time.
def recvReq(wlink, ctx): # -> SrvReq | None when EOF def recvReq(wlink, ctx): # -> SrvReq | None when EOF
_, _rx = select( _, _rx = select(
ctx.done().recv, # 0 ctx.done().recv, # 0
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment