[Neo-report] r2142 vincent - in /trunk/neo: ./ client/ tests/

nobody at svn.erp5.org nobody at svn.erp5.org
Mon Jun 7 15:47:40 CEST 2010


Author: vincent
Date: Mon Jun  7 15:47:26 2010
New Revision: 2142

Log:
When asking dispatcher to forget a packet, wake its queue

This fixes cases where a thread is expecting some answer from queue, and
a packet gets forgotten, as it would block until next expected packet
arrives (if there is no next packet, it will wait forever).

Modified:
    trunk/neo/client/app.py
    trunk/neo/dispatcher.py
    trunk/neo/tests/testDispatcher.py

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Mon Jun  7 15:47:26 2010
@@ -41,7 +41,7 @@
 from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
 from neo.exception import NeoException
 from neo.client.handlers import storage, master
-from neo.dispatcher import Dispatcher
+from neo.dispatcher import Dispatcher, ForgottenPacket
 from neo.client.poll import ThreadedPoll
 from neo.client.iterator import Iterator
 from neo.client.mq import MQ
@@ -227,8 +227,8 @@
                 conn, packet = get(block)
             except Empty:
                 break
-            if packet is None:
-                # connection was closed
+            if packet is None or isinstance(packet, ForgottenPacket):
+                # connection was closed or some packet was forgotten
                 continue
             block = False
             try:
@@ -243,14 +243,18 @@
         _handlePacket = self._handlePacket
         while True:
             conn, packet = get(True)
+            is_forgotten = isinstance(packet, ForgottenPacket)
             if target_conn is conn:
                 # check fake packet
                 if packet is None:
                     raise ConnectionClosed
                 if msg_id == packet.getId():
+                    if is_forgotten:
+                        raise ValueError, 'ForgottenPacket for an ' \
+                            'explicitely expected packet.'
                     self._handlePacket(conn, packet, handler=handler)
                     break
-            elif packet is not None:
+            elif not is_forgotten and packet is not None:
                 self._handlePacket(conn, packet)
 
     @profiler_decorator

Modified: trunk/neo/dispatcher.py
==============================================================================
--- trunk/neo/dispatcher.py [iso-8859-1] (original)
+++ trunk/neo/dispatcher.py [iso-8859-1] Mon Jun  7 15:47:26 2010
@@ -19,6 +19,18 @@
 from neo.profiling import profiler_decorator
 EMPTY = {}
 NOBODY = []
+
+class ForgottenPacket(object):
+    """
+      Instances of this class will be pushed to queue when an expected answer
+      is being forgotten. Its purpose is similar to pushing "None" when
+      connection is closed, but the meaning is different.
+    """
+    def __init__(self, msg_id):
+        self.msg_id = msg_id
+
+    def getId(self):
+        return self.msg_id
 
 def giant_lock(func):
     def wrapped(self, *args, **kw):
@@ -95,6 +107,7 @@
         if queue is NOBODY:
             raise KeyError, 'Already expected by NOBODY: %r, %r' % (
                 conn, msg_id)
+        queue.put((conn, ForgottenPacket(msg_id)))
         self.queue_dict[id(queue)] -= 1
         message_table[msg_id] = NOBODY
 

Modified: trunk/neo/tests/testDispatcher.py
==============================================================================
--- trunk/neo/tests/testDispatcher.py [iso-8859-1] (original)
+++ trunk/neo/tests/testDispatcher.py [iso-8859-1] Mon Jun  7 15:47:26 2010
@@ -18,7 +18,7 @@
 import unittest
 
 from mock import Mock
-from neo.dispatcher import Dispatcher
+from neo.dispatcher import Dispatcher, ForgottenPacket
 from Queue import Queue
 
 class DispatcherTests(unittest.TestCase):
@@ -116,6 +116,13 @@
         self.dispatcher.register(conn, 1, queue)
         # ...and forget about it
         self.dispatcher.forget(conn, 1)
+        # A ForgottenPacket must have been put in the queue
+        queue_conn, packet = queue.get(block=False)
+        self.assertTrue(isinstance(packet, ForgottenPacket), packet)
+        # ...with appropriate packet id
+        self.assertEqual(packet.getId(), 1)
+        # ...and appropriate connection
+        self.assertTrue(conn is queue_conn, (conn, queue_conn))
         # If forgotten twice, it must raise a KeyError
         self.assertRaises(KeyError, self.dispatcher.forget, conn, 1)
         # Event arrives, return value must be True (it was expected)
@@ -127,6 +134,7 @@
         self.dispatcher.register(conn, 1, queue)
         # ...and forget about it
         self.dispatcher.forget(conn, 1)
+        queue.get(block=False)
         # No exception must happen if connection is lost.
         self.dispatcher.unregister(conn)
         # Forgotten message's queue must not have received a "None"





More information about the Neo-report mailing list