[Neo-report] r2114 gregory - in /trunk/neo: ./ admin/ client/ client/handlers/ master/ mas...

nobody at svn.erp5.org nobody at svn.erp5.org
Sat May 15 09:55:31 CEST 2010


Author: gregory
Date: Sat May 15 09:55:30 2010
New Revision: 2114

Log:
Answer the partition table in one packet.

SendPartitionTable packet was sent between Ask and Answer PartitionTable
packets, as notifications. In this case, the only purpose of the 'Answer'
was to check that the partition table was filled. The 'Ask' allowed also
to request a part of the partitions but was not used and redundant with
AskPartitionList for neoctl.

This commit include the following work:
- The partition table is always send in one packet.
- The full partition table is always requested with AskPartitionTable
- The full partition table is notified with SendPartitionTable
- Client node process the answer in the bootstrap handler.
- Admin can receive answer *and* notifications for the partition table.
- Move the log calls to the pt.py module
- Add pt.getRowList() to factorise the code.
- Build partition table packets out of the loop when possible
- Always load inconditionnaly the partition table in generic pt.py
-

Modified:
    trunk/neo/admin/app.py
    trunk/neo/admin/handler.py
    trunk/neo/client/app.py
    trunk/neo/client/handlers/master.py
    trunk/neo/handler.py
    trunk/neo/master/app.py
    trunk/neo/master/handlers/__init__.py
    trunk/neo/master/recovery.py
    trunk/neo/protocol.py
    trunk/neo/pt.py
    trunk/neo/storage/app.py
    trunk/neo/storage/handlers/initialization.py
    trunk/neo/storage/handlers/verification.py
    trunk/neo/tests/client/testMasterHandler.py
    trunk/neo/tests/storage/testInitializationHandler.py
    trunk/neo/tests/storage/testVerificationHandler.py
    trunk/neo/tests/testProtocol.py

Modified: trunk/neo/admin/app.py
==============================================================================
--- trunk/neo/admin/app.py [iso-8859-1] (original)
+++ trunk/neo/admin/app.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -134,7 +134,7 @@
         # passive handler
         self.master_conn.setHandler(self.master_event_handler)
         self.master_conn.ask(Packets.AskNodeInformation())
-        self.master_conn.ask(Packets.AskPartitionTable([]))
+        self.master_conn.ask(Packets.AskPartitionTable())
 
     def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
         # we have a pt

Modified: trunk/neo/admin/handler.py
==============================================================================
--- trunk/neo/admin/handler.py [iso-8859-1] (original)
+++ trunk/neo/admin/handler.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -50,8 +50,7 @@
             if self.app.master_conn is None:
                 raise protocol.NotReadyError('Not connected to a primary ' \
                         'master.')
-            p = Packets.AskPartitionTable([])
-            msg_id = self.app.master_conn.ask(p)
+            msg_id = self.app.master_conn.ask(Packets.AskPartitionTable())
             app.dispatcher.register(msg_id, conn,
                                     {'min_offset' : min_offset,
                                      'max_offset' : max_offset,
@@ -146,18 +145,14 @@
         # implemented for factorize code (as done for bootstrap)
         logging.debug("answerNodeInformation")
 
-    def answerPartitionTable(self, conn, ptid, row_list):
-        # XXX: This will no more exists when the initialization module will be
-        # implemented for factorize code (as done for bootstrap)
-        logging.debug("answerPartitionTable")
-
     def notifyPartitionChanges(self, conn, ptid, cell_list):
         self.app.pt.update(ptid, cell_list, self.app.nm)
 
+    def answerPartitionTable(self, conn, ptid, row_list):
+        self.app.pt.load(ptid, row_list, self.app.nm)
+
     def sendPartitionTable(self, conn, ptid, row_list):
-        self.app.pt.clear()
         self.app.pt.load(ptid, row_list, self.app.nm)
-        self.app.pt.log()
 
     def notifyClusterInformation(self, conn, cluster_state):
         self.app.cluster_state = cluster_state
@@ -169,7 +164,7 @@
             # Re-ask partition table, in case node change filled it.
             # XXX: we should only ask it if received states indicates it is
             # possible (ignore TEMPORARILY_DOWN for example)
-            conn.ask(Packets.AskPartitionTable([]))
+            conn.ask(Packets.AskPartitionTable())
 
 class MasterRequestEventHandler(EventHandler):
     """ This class handle all answer from primary master node"""

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -368,7 +368,7 @@
                 msg_id = conn.ask(Packets.AskNodeInformation())
                 self._waitMessage(conn, msg_id,
                         handler=self.primary_bootstrap_handler)
-                msg_id = conn.ask(Packets.AskPartitionTable([]))
+                msg_id = conn.ask(Packets.AskPartitionTable())
                 self._waitMessage(conn, msg_id,
                         handler=self.primary_bootstrap_handler)
             ready = self.uuid is not None and self.pt is not None \

Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -81,7 +81,8 @@
             conn.close()
 
     def answerPartitionTable(self, conn, ptid, row_list):
-        pass
+        assert row_list
+        self.app.pt.load(ptid, row_list, self.app.nm)
 
     def answerNodeInformation(self, conn):
         pass
@@ -138,9 +139,6 @@
         if self.app.pt.filled():
             self.app.pt.update(ptid, cell_list, self.app.nm)
 
-    def sendPartitionTable(self, conn, ptid, row_list):
-        self.app.pt.load(ptid, row_list, self.app.nm)
-
     def notifyNodeInformation(self, conn, node_list):
         app = self.app
         self.app.nm.update(node_list)

Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -163,13 +163,10 @@
     def answerLastIDs(self, conn, loid, ltid, lptid):
         raise UnexpectedPacketError
 
-    def askPartitionTable(self, conn, offset_list):
+    def askPartitionTable(self, conn):
         raise UnexpectedPacketError
 
     def answerPartitionTable(self, conn, ptid, row_list):
-        raise UnexpectedPacketError
-
-    def sendPartitionTable(self, conn, ptid, row_list):
         raise UnexpectedPacketError
 
     def notifyPartitionChanges(self, conn, ptid, cell_list):
@@ -388,7 +385,6 @@
         d[Packets.AnswerLastIDs] = self.answerLastIDs
         d[Packets.AskPartitionTable] = self.askPartitionTable
         d[Packets.AnswerPartitionTable] = self.answerPartitionTable
-        d[Packets.SendPartitionTable] = self.sendPartitionTable
         d[Packets.NotifyPartitionChanges] = self.notifyPartitionChanges
         d[Packets.StartOperation] = self.startOperation
         d[Packets.StopOperation] = self.stopOperation

Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -278,30 +278,18 @@
         logging.debug('broadcastPartitionChanges')
         if not cell_list:
             return
+        self.pt.log()
         ptid = self.pt.setNextID()
-        self.pt.log()
+        packet = Packets.NotifyPartitionChanges(ptid, cell_list)
         for node in self.nm.getIdentifiedList():
             if not node.isRunning():
                 continue
             if node.isClient() or node.isStorage() or node.isAdmin():
-                node.notify(Packets.NotifyPartitionChanges(ptid, cell_list))
+                node.notify(packet)
 
     def outdateAndBroadcastPartition(self):
         " Outdate cell of non-working nodes and broadcast changes """
         self.broadcastPartitionChanges(self.pt.outdate())
-
-    def sendPartitionTable(self, conn):
-        """ Send the partition table through the given connection """
-        row_list = []
-        for offset in xrange(self.pt.getPartitions()):
-            row_list.append((offset, self.pt.getRow(offset)))
-            # Split the packet if too huge.
-            if len(row_list) == 1000:
-                conn.notify(Packets.SendPartitionTable(self.pt.getID(),
-                    row_list))
-                del row_list[:]
-        if row_list:
-            conn.notify(Packets.SendPartitionTable(self.pt.getID(), row_list))
 
     def sendNodesInformations(self, conn):
         """ Send informations on all nodes through the given connection """

Modified: trunk/neo/master/handlers/__init__.py
==============================================================================
--- trunk/neo/master/handlers/__init__.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/__init__.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -62,11 +62,10 @@
         self.app.sendNodesInformations(conn)
         conn.answer(Packets.AnswerNodeInformation())
 
-    def askPartitionTable(self, conn, offset_list):
-        assert len(offset_list) == 0
-        app = self.app
-        app.sendPartitionTable(conn)
-        conn.answer(Packets.AnswerPartitionTable(app.pt.getID(), []))
+    def askPartitionTable(self, conn):
+        ptid = self.app.pt.getID()
+        row_list = self.app.pt.getRowList()
+        conn.answer(Packets.AnswerPartitionTable(ptid, row_list))
 
 
 DISCONNECTED_STATE_DICT = {

Modified: trunk/neo/master/recovery.py
==============================================================================
--- trunk/neo/master/recovery.py [iso-8859-1] (original)
+++ trunk/neo/master/recovery.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -127,7 +127,7 @@
         if lptid > self.target_ptid:
             # something newer
             self.target_ptid = lptid
-            conn.ask(Packets.AskPartitionTable([]))
+            conn.ask(Packets.AskPartitionTable())
 
     def answerPartitionTable(self, conn, ptid, row_list):
         app = self.app
@@ -142,8 +142,11 @@
             raise ProtocolError('Invalid offset')
         else:
             notification = Packets.NotifyNodeInformation(new_nodes)
+            ptid = self.app.pt.getID()
+            row_list = self.app.pt.getRowList()
+            partition_table = Packets.SendPartitionTable(ptid, row_list)
             # notify the admin nodes
             for node in self.app.nm.getAdminList(only_identified=True):
                 node.notify(notification)
-                self.app.sendPartitionTable(node.getConnection())
+                node.notify(partition_table)
 

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -476,33 +476,9 @@
 
 class AskPartitionTable(Packet):
     """
-    Ask rows in a partition table that a storage node stores. Used to recover
-    information. PM -> S.
-    """
-    _header_format = '!L'
-    _list_entry_format = '!L'
-    _list_entry_len = calcsize(_list_entry_format)
-
-    def _encode(self, offset_list):
-        body = [pack(self._header_format, len(offset_list))]
-        list_entry_format = self._list_entry_format
-        for offset in offset_list:
-            body.append(pack(list_entry_format, offset))
-        return ''.join(body)
-
-    def _decode(self, body):
-        packet_offset = self._header_len
-        (n,) = unpack(self._header_format, body[:packet_offset])
-        offset_list = []
-        list_entry_len = self._list_entry_len
-        list_entry_format = self._list_entry_format
-        for _ in xrange(n):
-            next_packet_offset = packet_offset + list_entry_len
-            offset = unpack(list_entry_format,
-                body[packet_offset:next_packet_offset])[0]
-            packet_offset = next_packet_offset
-            offset_list.append(offset)
-        return (offset_list,)
+    Ask the full partition table. PM -> S.
+    """
+    pass
 
 class AnswerPartitionTable(Packet):
     """

Modified: trunk/neo/pt.py
==============================================================================
--- trunk/neo/pt.py [iso-8859-1] (original)
+++ trunk/neo/pt.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -195,19 +195,20 @@
     def load(self, ptid, row_list, nm):
         """
         Load the partition table with the specified PTID, discard all previous
-        content and can be done in multiple calls
-        """
-        if ptid != self.id:
-            self.clear()
-            self.id = ptid
+        content.
+        """
+        self.clear()
+        self.id = ptid
         for offset, row in row_list:
-            if offset >= self.getPartitions() or self.hasOffset(offset):
+            if offset >= self.getPartitions():
                 raise IndexError
             for uuid, state in row:
                 node = nm.getByUUID(uuid)
                 # the node must be known by the node manager
                 assert node is not None
                 self.setCell(offset, node, state)
+        logging.debug('partition table loaded')
+        self.log()
 
     def update(self, ptid, cell_list, nm):
         """
@@ -299,6 +300,10 @@
             return []
         return [(cell.getUUID(), cell.getState()) for cell in row]
 
+    def getRowList(self):
+        getRow = self.getRow
+        return [(x, getRow(x)) for x in xrange(self.np)]
+
 
 def thread_safe(method):
     def wrapper(self, *args, **kwargs):

Modified: trunk/neo/storage/app.py
==============================================================================
--- trunk/neo/storage/app.py [iso-8859-1] (original)
+++ trunk/neo/storage/app.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -247,7 +247,7 @@
         self.pt.clear()
         self.master_conn.ask(Packets.AskLastIDs())
         self.master_conn.ask(Packets.AskNodeInformation())
-        self.master_conn.ask(Packets.AskPartitionTable(()))
+        self.master_conn.ask(Packets.AskPartitionTable())
         while not self.has_node_information or not self.has_partition_table \
                 or not self.has_last_ids:
             self.em.poll(1)

Modified: trunk/neo/storage/handlers/initialization.py
==============================================================================
--- trunk/neo/storage/handlers/initialization.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/initialization.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -29,15 +29,10 @@
         # the whole node list is received here
         BaseMasterHandler.notifyNodeInformation(self, conn, node_list)
 
-    def sendPartitionTable(self, conn, ptid, row_list):
-        """A primary master node sends this packet to synchronize a partition
-        table. Note that the message can be split into multiple packets."""
-        self.app.pt.load(ptid, row_list, self.app.nm)
-
     def answerPartitionTable(self, conn, ptid, row_list):
         app = self.app
         pt = app.pt
-        assert not row_list
+        pt.load(ptid, row_list, self.app.nm)
         if not pt.filled():
             raise protocol.ProtocolError('Partial partition table received')
         logging.debug('Got the partition table :')
@@ -68,9 +63,6 @@
     def notifyPartitionChanges(self, conn, ptid, cell_list):
         # XXX: This is safe to ignore those notifications because all of the
         # following applies:
-        # - master is monothreaded (notifyPartitionChanges cannot happen
-        #   between sendPartitionTable/answerPartitionTable packets), so
-        #   receiving the whole partition table is atomic
         # - we first ask for node information, and *then* partition
         #   table content, so it is possible to get notifyPartitionChanges
         #   packets in between (or even before asking for node information).

Modified: trunk/neo/storage/handlers/verification.py
==============================================================================
--- trunk/neo/storage/handlers/verification.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/verification.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -37,20 +37,10 @@
             tid = None
         conn.answer(Packets.AnswerLastIDs(oid, tid, app.pt.getID()))
 
-    def askPartitionTable(self, conn, offset_list):
-        if not offset_list:
-            # all is requested
-            offset_list = xrange(0, self.app.pt.getPartitions())
-        else:
-            if max(offset_list) >= self.app.pt.getPartitions():
-                raise ProtocolError('invalid partition table offset')
-
-        # build a table with requested partitions
-        row_list = [(offset, [(cell.getUUID(), cell.getState())
-            for cell in self.app.pt.getCellList(offset)])
-            for offset in offset_list]
-
-        conn.answer(Packets.AnswerPartitionTable(self.app.pt.getID(), row_list))
+    def askPartitionTable(self, conn):
+        ptid = self.app.pt.getID()
+        row_list = self.app.pt.getRowList()
+        conn.answer(Packets.AnswerPartitionTable(ptid, row_list))
 
     def notifyPartitionChanges(self, conn, ptid, cell_list):
         """This is very similar to Send Partition Table, except that

Modified: trunk/neo/tests/client/testMasterHandler.py
==============================================================================
--- trunk/neo/tests/client/testMasterHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/client/testMasterHandler.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -123,6 +123,16 @@
         self.assertEqual(self.app.trying_master_node, None)
         self.assertTrue(self.app.primary_master_node is node)
         self.checkClosed(conn)
+
+    def test_answerPartitionTable(self):
+        conn = self.getConnection()
+        self.app.pt = Mock()
+        ptid = 0
+        row_list = ([], [])
+        self.handler.answerPartitionTable(conn, ptid, row_list)
+        load_calls = self.app.pt.mockGetNamedCalls('load')
+        self.assertEqual(len(load_calls), 1)
+        # load_calls[0].checkArgs(ptid, row_list, self.app.nm)
 
 
 class MasterNotificationsHandlerTests(MasterHandlerTests):
@@ -168,16 +178,6 @@
         self.assertEqual(len(update_calls), 1)
         update_calls[0].checkArgs(ptid, cell_list, self.app.nm)
 
-    def test_sendPartitionTable(self):
-        conn = self.getConnection()
-        self.app.pt = Mock()
-        ptid = 0
-        row_list = (Mock(), Mock())
-        self.handler.sendPartitionTable(conn, ptid, row_list)
-        load_calls = self.app.pt.mockGetNamedCalls('load')
-        self.assertEqual(len(load_calls), 1)
-        load_calls[0].checkArgs(ptid, row_list, self.app.nm)
-        
     def test_notifyNodeInformation(self):
         conn = self.getConnection()
         addr = ('127.0.0.1', 1000)

Modified: trunk/neo/tests/storage/testInitializationHandler.py
==============================================================================
--- trunk/neo/tests/storage/testInitializationHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testInitializationHandler.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -21,7 +21,7 @@
 from neo.pt import PartitionTable
 from neo.storage.app import Application
 from neo.storage.handlers.initialization import InitializationHandler
-from neo.protocol import CellStates
+from neo.protocol import CellStates, ProtocolError
 from neo.exception import PrimaryFailure
 
 class StorageInitializationHandlerTests(NeoTestBase):
@@ -71,7 +71,7 @@
         # nothing happens
         self.checkNoPacketSent(conn)
 
-    def test_09_sendPartitionTable(self):
+    def test_09_answerPartitionTable(self):
         # send a table
         conn = self.getClientConnection()
         self.app.pt = PartitionTable(3, 2)
@@ -87,20 +87,8 @@
                     (1, ((node_3, CellStates.UP_TO_DATE), (node_1, CellStates.UP_TO_DATE))),
                     (2, ((node_2, CellStates.UP_TO_DATE), (node_3, CellStates.UP_TO_DATE)))]
         self.assertFalse(self.app.pt.filled())
-        # send part of the table, won't be filled
-        self.verification.sendPartitionTable(conn, 1, row_list[:1])
-        self.assertFalse(self.app.pt.filled())
-        self.assertEqual(self.app.pt.getID(), 1)
-        self.assertEqual(self.app.dm.getPartitionTable(), [])
-        # send remaining of the table (ack with AnswerPartitionTable)
-        self.verification.sendPartitionTable(conn, 1, row_list[1:])
-        self.verification.answerPartitionTable(conn, 1, [])
-        self.assertTrue(self.app.pt.filled())
-        self.assertEqual(self.app.pt.getID(), 1)
-        self.assertNotEqual(self.app.dm.getPartitionTable(), [])
         # send a complete new table and ack
-        self.verification.sendPartitionTable(conn, 2, row_list)
-        self.verification.answerPartitionTable(conn, 2, [])
+        self.verification.answerPartitionTable(conn, 2, row_list)
         self.assertTrue(self.app.pt.filled())
         self.assertEqual(self.app.pt.getID(), 2)
         self.assertNotEqual(self.app.dm.getPartitionTable(), [])

Modified: trunk/neo/tests/storage/testVerificationHandler.py
==============================================================================
--- trunk/neo/tests/storage/testVerificationHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testVerificationHandler.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -121,19 +121,6 @@
         self.assertEqual(ptid, self.app.pt.getID())
 
     def test_08_askPartitionTable(self):
-        # try to get unknown offset
-        self.assertEqual(len(self.app.pt.getNodeList()), 0)
-        self.assertFalse(self.app.pt.hasOffset(1))
-        self.assertEqual(len(self.app.pt.getCellList(1)), 0)
-        conn = self.getClientConnection()
-        self.verification.askPartitionTable(conn, [1])
-        ptid, row_list = self.checkAnswerPartitionTable(conn, decode=True)
-        self.assertEqual(len(row_list), 1)
-        offset, rows = row_list[0]
-        self.assertEqual(offset, 1)
-        self.assertEqual(len(rows), 0)
-
-        # try to get known offset
         node = self.app.nm.createStorage(
             address=("127.7.9.9", 1),
             uuid=self.getNewUUID()
@@ -141,12 +128,9 @@
         self.app.pt.setCell(1, node, CellStates.UP_TO_DATE)
         self.assertTrue(self.app.pt.hasOffset(1))
         conn = self.getClientConnection()
-        self.verification.askPartitionTable(conn, [1])
+        self.verification.askPartitionTable(conn)
         ptid, row_list = self.checkAnswerPartitionTable(conn, decode=True)
-        self.assertEqual(len(row_list), 1)
-        offset, rows = row_list[0]
-        self.assertEqual(offset, 1)
-        self.assertEqual(len(rows), 1)
+        self.assertEqual(len(row_list), 1009)
 
     def test_10_notifyPartitionChanges(self):
         # old partition change

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Sat May 15 09:55:30 2010
@@ -144,10 +144,7 @@
         self.assertEqual(lptid, ptid)
 
     def test_20_askPartitionTable(self):
-        offset_list = [1, 523, 6, 124]
-        p = Packets.AskPartitionTable(offset_list)
-        p_offset_list  = p.decode()[0]
-        self.assertEqual(offset_list, p_offset_list)
+        self.assertEqual(Packets.AskPartitionTable().decode(), ())
 
     def test_21_answerPartitionTable(self):
         ptid = self.getNextTID()





More information about the Neo-report mailing list