Commit ad7231a7 authored by Guillaume Hervier's avatar Guillaume Hervier

Fix virtual files concurrent accesses problems

parent 949644a6
......@@ -23,6 +23,7 @@
from __future__ import generators
import types, os, tempfile, cPickle, shutil, traceback, \
socket, sys, gzip, threading
import sync
from pool import Pool
# The following EA and ACL modules may be used if available
try: import xattr
......@@ -289,51 +290,10 @@ class LowLevelPipeConnection(Connection):
self.inpipe.close()
class RequestNumberRegistry(object):
def __init__(self):
self._lock = threading.RLock()
self._next = 0
self._entries = {}
def get(self):
with self._lock:
if self._next >= 256:
return None
req_num = self._next
self.insert(req_num)
return req_num
def insert(self, req_num):
with self._lock:
if req_num in self._entries:
# Vacant slot
self._next = self._entries[req_num]
else:
self._next += 1
def remove(self, req_num):
with self._lock:
self._entries[req_num] = self._next
self._next = req_num
class AsyncRequest(object):
class AsyncRequest(sync.AsyncValue):
def __init__(self, req_num):
sync.AsyncValue.__init__(self)
self.req_num = req_num
self.value = None
self.completed = threading.Event()
def set(self, value):
self.value = value
self.completed.set()
def get(self):
while not self.completed.is_set():
self.completed.wait()
return self.value
class PipeConnection(LowLevelPipeConnection):
......@@ -369,7 +329,7 @@ class PipeConnection(LowLevelPipeConnection):
"""
LowLevelPipeConnection.__init__(self, inpipe, outpipe)
self.conn_number = conn_number
self.request_numbers = RequestNumberRegistry()
self.request_numbers = sync.IndexGenerator(maxvalue=256)
self.requests = {}
self.pool = Pool(processes=4,
max_taskqueue_size=16)
......@@ -571,37 +531,38 @@ class VirtualFile:
#### The following are used by the server
vfiles = {}
counter = 0
id_generator = sync.IndexGenerator()
@classmethod
def getbyid(cls, id):
return cls.vfiles[id]
getbyid = classmethod(getbyid)
@classmethod
def readfromid(cls, id, length):
if length is None: return cls.vfiles[id].read()
else: return cls.vfiles[id].read(length)
readfromid = classmethod(readfromid)
@classmethod
def readlinefromid(cls, id):
return cls.vfiles[id].readline()
readlinefromid = classmethod(readlinefromid)
@classmethod
def writetoid(cls, id, buffer):
return cls.vfiles[id].write(buffer)
writetoid = classmethod(writetoid)
@classmethod
def closebyid(cls, id):
fp = cls.vfiles[id]
cls.id_generator.remove(id)
del cls.vfiles[id]
return fp.close()
closebyid = classmethod(closebyid)
@classmethod
def new(cls, fileobj):
"""Associate a new VirtualFile with a read fileobject, return id"""
count = cls.counter
cls.vfiles[count] = fileobj
cls.counter = count + 1
return count
new = classmethod(new)
id = cls.id_generator.get()
cls.vfiles[id] = fileobj
return id
#### And these are used by the client
......
'''
Threading sync utilities
'''
import threading
class IndexGenerator(object):
def __init__(self, maxvalue=None):
self._lock = threading.RLock()
self._next = 0
self._entries = {}
self._maxvalue = maxvalue
def get(self):
with self._lock:
if self._maxvalue is not None and self._next >= self._maxvalue:
return None
index = self._next
self.insert(index)
return index
def insert(self, index):
with self._lock:
if index in self._entries:
# Vacant slot
self._next = self._entries[index]
else:
self._next += 1
def remove(self, index):
with self._lock:
self._entries[index] = self._next
self._next = index
class AsyncValue(object):
def __init__(self):
self.value = None
self.completed = threading.Event()
def set(self, value):
self.value = value
self.completed.set()
def get(self):
while not self.completed.is_set():
self.completed.wait()
return self.value
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment