diff --git a/wcfs/__init__.py b/wcfs/__init__.py index 8e215fca87ccc42f57cf9a86ebeed732b5efde25..a6ac3a24b76f93b1f428a5e949d47db56cc7f70f 100644 --- a/wcfs/__init__.py +++ b/wcfs/__init__.py @@ -146,6 +146,8 @@ def close(wconn): f.headf.close() f.headf = None + # XXX stop watching f + wconn._filetab = None @@ -197,6 +199,8 @@ def mmap(wconn, foid, blk_start, blk_len): # -> Mapping f.mmaps = [] wconn._filetab[foid] = f + # XXX start watching f + # XXX relock wconn -> f ? # create memory with head/f mapping and applied pins @@ -260,8 +264,9 @@ def unmap(mmap): # WatchLink represents /head/watch link opened on wcfs. # # .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages. +# .close() closes the link. # -# XXX safe/not-safe to access from multiple threads? +# It is safe to use WatchLink from multiple threads simultaneously. class WatchLink(object): def __init__(wlink, wc): @@ -290,6 +295,7 @@ class WatchLink(object): wlink._rxtab = {} # stream -> rxq server replies go via here wlink._accepted = set() # of stream streams we accepted but did not replied yet + wlink._req_next = 1 # stream ID for next client-originated request XXX -> atomic wlink._txmu = threading.Lock() # serializes writes wlink._txclosed = False @@ -298,7 +304,7 @@ class WatchLink(object): wlink._serveWG.go(wlink._serveRX) # this tWatchLink currently watches the following files at particular state. - # XXX back -> tWatchLink ? + # XXX test only: back -> tWatchLink ? wlink._watching = {} # {} foid -> tWatch @@ -316,6 +322,7 @@ class WatchLink(object): wlink._wtx.close() wlink._txclosed = True + # close closes the link. def close(wlink): wlink._closeTX() wlink._serveCancel() @@ -410,8 +417,6 @@ class WatchLink(object): wlink._wtx.flush() # sendReq sends client -> server request and returns server reply. - # - # only 1 sendReq must be used at a time. # XXX relax? def sendReq(wlink, ctx, req): # -> reply | None when EOF rxq = wlink._sendReq(ctx, req) @@ -424,7 +429,9 @@ class WatchLink(object): return _rx def _sendReq(wlink, ctx, req): # -> rxq - stream = 1 # XXX -> dynamic + with wlink._txmu: # XXX -> atomic (currently uses arbitrary lock) + stream = wlink._req_next + wlink._req_next = (wlink._req_next + 2) & ((1<<64)-1) rxq = chan() with wlink._rxmu: @@ -435,8 +442,6 @@ class WatchLink(object): return rxq # recvReq receives client <- server request. - # - # multiple recvReq could be used at a time. def recvReq(wlink, ctx): # -> SrvReq | None when EOF _, _rx = select( ctx.done().recv, # 0