Commit 9465bc7d authored by Denis Bilenko's avatar Denis Bilenko

Queue: do not make items that did not yet received a slot participate in the delivery order

although the feature is nice, it's did not work properly, e.g. for PriorityQueue.
neither the standard Queue nor coros.channel/queue do not support it and so are we now.
parent 9300516a
......@@ -3,7 +3,7 @@ import heapq
import collections
from Queue import Full, Empty
from gevent.greenlet import Timeout, Waiter, get_hub, getcurrent
from gevent.greenlet import Timeout, Waiter, get_hub, getcurrent, _NONE
from gevent import core
......@@ -38,20 +38,6 @@ class Queue(object):
def _put(self, item):
self.queue.append(item)
def _remove(self, item):
try:
remove = self.queue.remove
except AttributeError:
deque_remove(self.queue, item)
else:
try:
remove(item)
except ValueError:
pass
def _peek(self):
return self.queue[0]
def __repr__(self):
return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._format())
......@@ -79,15 +65,14 @@ class Queue(object):
Queue is not empty if there are greenlets blocking on put()
"""
return not self.qsize()
return not self.qsize() and not self.putters
def full(self):
"""Return True if the queue is full, False otherwise.
Queue is not full if there are greenlets blocking on get()
Queue(None) is never full.
"""
return (self.qsize()-len(self.getters)) >= self.maxsize
return self.qsize() + len(self.putters) - len(self.getters) >= self.maxsize
def put(self, item, block=True, timeout=None):
"""Put an item into the queue.
......@@ -101,47 +86,32 @@ class Queue(object):
is ignored in that case).
"""
if self.maxsize is None or self.qsize() < self.maxsize:
# there's a free slot, put an item right away
self._put(item)
if self.getters:
self._schedule_unlock()
elif self.getters and not block and get_hub().greenlet is getcurrent():
# special case to make get_nowait() runnable in the mainloop greenlet
putter = _ItemProxy(item, Waiter())
self._put(putter)
try:
while self.getters:
getter = self.getters.pop()
if getter.waiting:
item = self._get()
if isinstance(item, _ItemProxy):
realitem, putter_waiter = item
else:
realitem = item
getter.switch(realitem)
if isinstance(item, _ItemProxy):
if putter_waiter.waiting:
# unlock the greenlet calling put()
putter_waiter.switch(putter_waiter)
return
raise Full
except:
self._remove(putter)
raise
elif not block and get_hub().greenlet is getcurrent():
# we're in the mainloop, so we cannot wait; we can switch() to other greenlets though
# find a getter and deliver an item to it
while self.getters:
getter = self.getters.pop()
if getter:
self._put(item)
item = self._get()
getter.switch(item)
return
raise Full
elif block:
waiter = Waiter()
putter = _ItemProxy(item, waiter)
waiter = ItemWaiter(item)
self.putters.add(waiter)
timeout = Timeout(timeout, Full)
try:
self._put(putter)
try:
if self.getters:
self._schedule_unlock()
result = waiter.wait()
assert result is waiter, "Invalid switch into Queue.put: %r" % (result, )
except:
self._remove(putter)
raise
if self.getters:
self._schedule_unlock()
result = waiter.wait()
assert result is waiter, "Invalid switch into Queue.put: %r" % (result, )
if waiter.item is not _NONE:
self._put(item)
finally:
timeout.cancel()
self.putters.discard(waiter)
......@@ -167,31 +137,32 @@ class Queue(object):
available, else raise the Empty exception ('timeout' is ignored
in that case).
"""
if self.qsize() and not isinstance(self._peek(), _ItemProxy):
if self.qsize():
if self.putters:
self._schedule_unlock()
return self._get()
elif self.qsize() and not block and get_hub().greenlet is getcurrent():
elif not block and get_hub().greenlet is getcurrent():
# special case to make get_nowait() runnable in the mainloop greenlet
item = self._get()
realitem, putter_waiter = item
putter_waiter.switch(putter_waiter)
return realitem
elif block or self.qsize():
if not block:
timeout = 0
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
putter = self.putters.pop()
if putter:
# obtain the item directly from putter bypassing the queue
item = putter.item
putter.item = _NONE
putter.switch(putter)
return item
raise Empty
elif block:
waiter = Waiter()
timeout = Timeout(timeout, Empty)
self.getters.add(waiter)
try:
self.getters.add(waiter)
try:
if self.qsize():
self._schedule_unlock()
return waiter.wait()
except:
self.getters.discard(waiter)
raise
if self.putters:
self._schedule_unlock()
return waiter.wait()
finally:
self.getters.discard(waiter)
timeout.cancel()
else:
raise Empty
......@@ -209,25 +180,28 @@ class Queue(object):
while True:
if self.qsize() and self.getters:
getter = self.getters.pop()
if getter.waiting:
if getter:
try:
item = self._get()
except:
getter.throw(*sys.exc_info())
else:
if isinstance(item, _ItemProxy):
realitem, putter_waiter = item
else:
realitem = item
getter.switch(realitem)
if isinstance(item, _ItemProxy):
if putter_waiter.waiting:
# unlock the greenlet calling put()
putter_waiter.switch(putter_waiter)
elif self.putters and self.qsize()-len(self.putters) < self.maxsize:
getter.switch(item)
elif self.putters and self.getters:
putter = self.putters.pop()
if putter:
getter = self.getters.pop()
if getter:
# deliver the item from putter to getter bypassing the queue
item = putter.item
putter.item = _NONE
getter.switch(item)
putter.switch(putter)
else:
self.putters.add(putter)
elif self.putters and (self.getters or self.qsize() < self.maxsize):
putter = self.putters.pop()
if putter.waiting:
putter.switch(putter)
putter.switch(putter)
else:
break
finally:
......@@ -242,11 +216,12 @@ class Queue(object):
# QQQ re-active event instead of creating a new one each time
def deque_remove(deque, item):
for index, x in enumerate(deque):
if x==item:
del deque[index]
return
class ItemWaiter(Waiter):
__slots__ = ['item']
def __init__(self, item):
Waiter.__init__(self)
self.item = item
class PriorityQueue(Queue):
......@@ -277,34 +252,3 @@ class LifoQueue(Queue):
def _get(self):
return self.queue.pop()
class _ItemProxy(object):
def __init__(self, item, waiter):
self._item = item
self._waiter = waiter
def __cmp__(self, other):
return cmp(self._item, other)
def __nonzero__(self):
return bool(self._item)
def __getattr__(self, item):
return getattr(self._item, item)
def __hash__(self):
return self._item.__hash__()
def __repr__(self):
return '_ItemProxy(%r, %r)' % (self._item, self._waiter)
def __str__(self):
return self._item.__str__()
# XXX: other methods
def __iter__(self):
yield self._item
yield self._waiter
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment