Commit 96a6b569 authored by Denis Bilenko's avatar Denis Bilenko

queue: add Channel class

parent 4fd76bf0
......@@ -42,7 +42,7 @@ from gevent.timeout import Timeout
from gevent.hub import get_hub, Waiter, getcurrent, _NONE
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue']
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue', 'Channel']
class Queue(object):
......@@ -335,3 +335,112 @@ class JoinableQueue(Queue):
unfinished tasks drops to zero, :meth:`join` unblocks.
'''
self._cond.wait()
class Channel(object):
def __init__(self):
self.getters = collections.deque()
self.putters = collections.deque()
self.hub = get_hub()
self._event_unlock = self.hub.loop.callback()
def __repr__(self):
return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._format())
def __str__(self):
return '<%s %s>' % (type(self).__name__, self._format())
def _format(self):
result = ''
if self.getters:
result += ' getters[%s]' % len(self.getters)
if self.putters:
result += ' putters[%s]' % len(self.putters)
return result
@property
def balance(self):
return len(self.putters) - len(self.getters)
def qsize(self):
return 0
def empty(self):
return True
def full(self):
return True
def put(self, item, block=True, timeout=None):
if self.hub is getcurrent():
if self.getters:
getter = self.getters.popleft()
getter.switch(item)
return
raise Full
if not block:
timeout = 0
waiter = Waiter()
self.putters.append((item, waiter))
timeout = Timeout.start_new(timeout, Full)
try:
if self.getters:
self._schedule_unlock()
result = waiter.get()
assert result is waiter, "Invalid switch into Channel.put: %r" % (result, )
except:
self.putters.remove(waiter)
raise
finally:
timeout.cancel()
def put_nowait(self, item):
self.put(item, False)
def get(self, block=True, timeout=None):
if self.hub is getcurrent():
if self.putters:
item, putter = self.putters.popleft()
self.hub.loop.run_callback(putter.switch, putter)
return item
if not block:
timeout = 0
waiter = Waiter()
timeout = Timeout.start_new(timeout, Empty)
try:
self.getters.append(waiter)
if self.putters:
self._schedule_unlock()
return waiter.get()
except:
self.getters.remove(waiter)
raise
finally:
timeout.cancel()
def get_nowait(self):
return self.get(False)
def _unlock(self):
while self.putters and self.getters:
getter = self.getters.popleft()
item, putter = self.putters.popleft()
getter.switch(item)
putter.switch(putter)
def _schedule_unlock(self):
self._event_unlock.start(self._unlock)
def __iter__(self):
return self
def next(self):
result = self.get()
if result is StopIteration:
raise result
return result
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment