[Neo-report] r2142 vincent - in /trunk/neo: ./ client/ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Jun 7 15:47:40 CEST 2010
Author: vincent
Date: Mon Jun 7 15:47:26 2010
New Revision: 2142
Log:
When asking dispatcher to forget a packet, wake its queue
This fixes cases where a thread is expecting some answer from queue, and
a packet gets forgotten, as it would block until next expected packet
arrives (if there is no next packet, it will wait forever).
Modified:
trunk/neo/client/app.py
trunk/neo/dispatcher.py
trunk/neo/tests/testDispatcher.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Mon Jun 7 15:47:26 2010
@@ -41,7 +41,7 @@
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
from neo.exception import NeoException
from neo.client.handlers import storage, master
-from neo.dispatcher import Dispatcher
+from neo.dispatcher import Dispatcher, ForgottenPacket
from neo.client.poll import ThreadedPoll
from neo.client.iterator import Iterator
from neo.client.mq import MQ
@@ -227,8 +227,8 @@
conn, packet = get(block)
except Empty:
break
- if packet is None:
- # connection was closed
+ if packet is None or isinstance(packet, ForgottenPacket):
+ # connection was closed or some packet was forgotten
continue
block = False
try:
@@ -243,14 +243,18 @@
_handlePacket = self._handlePacket
while True:
conn, packet = get(True)
+ is_forgotten = isinstance(packet, ForgottenPacket)
if target_conn is conn:
# check fake packet
if packet is None:
raise ConnectionClosed
if msg_id == packet.getId():
+ if is_forgotten:
+ raise ValueError, 'ForgottenPacket for an ' \
+ 'explicitely expected packet.'
self._handlePacket(conn, packet, handler=handler)
break
- elif packet is not None:
+ elif not is_forgotten and packet is not None:
self._handlePacket(conn, packet)
@profiler_decorator
Modified: trunk/neo/dispatcher.py
==============================================================================
--- trunk/neo/dispatcher.py [iso-8859-1] (original)
+++ trunk/neo/dispatcher.py [iso-8859-1] Mon Jun 7 15:47:26 2010
@@ -19,6 +19,18 @@
from neo.profiling import profiler_decorator
EMPTY = {}
NOBODY = []
+
+class ForgottenPacket(object):
+ """
+ Instances of this class will be pushed to queue when an expected answer
+ is being forgotten. Its purpose is similar to pushing "None" when
+ connection is closed, but the meaning is different.
+ """
+ def __init__(self, msg_id):
+ self.msg_id = msg_id
+
+ def getId(self):
+ return self.msg_id
def giant_lock(func):
def wrapped(self, *args, **kw):
@@ -95,6 +107,7 @@
if queue is NOBODY:
raise KeyError, 'Already expected by NOBODY: %r, %r' % (
conn, msg_id)
+ queue.put((conn, ForgottenPacket(msg_id)))
self.queue_dict[id(queue)] -= 1
message_table[msg_id] = NOBODY
Modified: trunk/neo/tests/testDispatcher.py
==============================================================================
--- trunk/neo/tests/testDispatcher.py [iso-8859-1] (original)
+++ trunk/neo/tests/testDispatcher.py [iso-8859-1] Mon Jun 7 15:47:26 2010
@@ -18,7 +18,7 @@
import unittest
from mock import Mock
-from neo.dispatcher import Dispatcher
+from neo.dispatcher import Dispatcher, ForgottenPacket
from Queue import Queue
class DispatcherTests(unittest.TestCase):
@@ -116,6 +116,13 @@
self.dispatcher.register(conn, 1, queue)
# ...and forget about it
self.dispatcher.forget(conn, 1)
+ # A ForgottenPacket must have been put in the queue
+ queue_conn, packet = queue.get(block=False)
+ self.assertTrue(isinstance(packet, ForgottenPacket), packet)
+ # ...with appropriate packet id
+ self.assertEqual(packet.getId(), 1)
+ # ...and appropriate connection
+ self.assertTrue(conn is queue_conn, (conn, queue_conn))
# If forgotten twice, it must raise a KeyError
self.assertRaises(KeyError, self.dispatcher.forget, conn, 1)
# Event arrives, return value must be True (it was expected)
@@ -127,6 +134,7 @@
self.dispatcher.register(conn, 1, queue)
# ...and forget about it
self.dispatcher.forget(conn, 1)
+ queue.get(block=False)
# No exception must happen if connection is lost.
self.dispatcher.unregister(conn)
# Forgotten message's queue must not have received a "None"
More information about the Neo-report
mailing list