[Neo-report] r2835 jm - in /trunk/neo: admin/ master/ tests/master/ tests/threaded/

nobody at svn.erp5.org nobody at svn.erp5.org
Thu Sep 8 19:51:13 CEST 2011


Author: jm
Date: Thu Sep  8 19:51:13 2011
New Revision: 2835

Log:
Fix handling of incoming non-empty storage nodes just after startup is allowed

Modified:
    trunk/neo/admin/handler.py
    trunk/neo/master/recovery.py
    trunk/neo/tests/master/testRecovery.py
    trunk/neo/tests/threaded/test.py

Modified: trunk/neo/admin/handler.py
==============================================================================
--- trunk/neo/admin/handler.py [iso-8859-1] (original)
+++ trunk/neo/admin/handler.py [iso-8859-1] Thu Sep  8 19:51:13 2011
@@ -157,11 +157,6 @@ class MasterEventHandler(EventHandler):
     def notifyNodeInformation(self, conn, node_list):
         app = self.app
         app.nm.update(node_list)
-        if not app.pt.filled():
-            # Re-ask partition table, in case node change filled it.
-            # XXX: we should only ask it if received states indicates it is
-            # possible (ignore TEMPORARILY_DOWN for example)
-            conn.ask(Packets.AskPartitionTable())
 
 class MasterRequestEventHandler(EventHandler):
     """ This class handle all answer from primary master node"""

Modified: trunk/neo/master/recovery.py
==============================================================================
--- trunk/neo/master/recovery.py [iso-8859-1] (original)
+++ trunk/neo/master/recovery.py [iso-8859-1] Thu Sep  8 19:51:13 2011
@@ -23,7 +23,6 @@ from neo.lib.protocol import Packets, Pr
 from neo.lib.protocol import NotReadyError, ZERO_OID, ZERO_TID
 from neo.master.handlers import MasterHandler
 
-REQUIRED_NODE_NUMBER = 1
 
 class RecoveryManager(MasterHandler):
     """
@@ -42,14 +41,7 @@ class RecoveryManager(MasterHandler):
         """
             Returns the handler for storage nodes
         """
-        # XXX: Looking at 'uuid' is not a good criteria to know if the storage
-        #      is empty. Empty node should be accepted here.
-        #      This is also the first step to fix handling of incoming
-        #      non-empty storage nodes, whereas startup was already allowed.
-        if uuid is None and not self.app._startup_allowed:
-            neo.lib.logging.info('reject empty storage node')
-            raise NotReadyError
-        return (uuid, NodeStates.RUNNING, self)
+        return uuid, NodeStates.PENDING, self
 
     def run(self):
         """
@@ -67,22 +59,37 @@ class RecoveryManager(MasterHandler):
         self.app.pt.setID(None)
 
         # collect the last partition table available
-        while not self.app._startup_allowed:
+        while 1:
             em.poll(1)
+            if self.app._startup_allowed:
+                allowed_node_set = set()
+                for node in self.app.nm.getStorageList():
+                    if node.isPending():
+                        break # waiting for an answer
+                    if node.isRunning():
+                        allowed_node_set.add(node)
+                else:
+                    if allowed_node_set:
+                        break # no ready storage node
 
         neo.lib.logging.info('startup allowed')
 
-        # build a new partition table
         if self.app.pt.getID() is None:
-            self.buildFromScratch()
+            neo.lib.logging.info('creating a new partition table')
+            # reset IDs generators & build new partition with running nodes
+            self.app.tm.setLastOID(ZERO_OID)
+            self.app.pt.make(allowed_node_set)
+            self._broadcastPartitionTable(self.app.pt.getID(),
+                                          self.app.pt.getRowList())
 
         # collect node that are connected but not in the selected partition
         # table and set them in pending state
-        allowed_node_set = set(self.app.pt.getNodeList())
-        refused_node_set = set(self.app.nm.getStorageList()) - allowed_node_set
-        for node in refused_node_set:
-            node.setPending()
-        self.app.broadcastNodesInformation(refused_node_set)
+        refused_node_set = allowed_node_set.difference(
+            self.app.pt.getNodeList())
+        if refused_node_set:
+            for node in refused_node_set:
+                node.setPending()
+            self.app.broadcastNodesInformation(refused_node_set)
 
         self.app.setLastTransaction(self.app.tm.getLastTID())
         neo.lib.logging.debug(
@@ -90,23 +97,6 @@ class RecoveryManager(MasterHandler):
                         'table :', dump(self.app.tm.getLastOID()))
         self.app.pt.log()
 
-    def buildFromScratch(self):
-        nm, em, pt = self.app.nm, self.app.em, self.app.pt
-        neo.lib.logging.debug('creating a new partition table, wait for a ' \
-            'storage node')
-        # wait for some empty storage nodes, their are accepted
-        while len(nm.getStorageList()) < REQUIRED_NODE_NUMBER:
-            em.poll(1)
-        # take the first node available
-        node_list = nm.getStorageList()[:REQUIRED_NODE_NUMBER]
-        for node in node_list:
-            node.setRunning()
-        self.app.broadcastNodesInformation(node_list)
-        # resert IDs generators
-        self.app.tm.setLastOID(ZERO_OID)
-        # build the partition with this node
-        pt.make(node_list)
-
     def connectionLost(self, conn, new_state):
         node = self.app.nm.getByUUID(conn.getUUID())
         assert node is not None
@@ -117,10 +107,8 @@ class RecoveryManager(MasterHandler):
         self.app.broadcastNodesInformation([node])
 
     def connectionCompleted(self, conn):
-        # XXX: handler split review needed to remove this hack
-        if not self.app._startup_allowed:
-            # ask the last IDs to perform the recovery
-            conn.ask(Packets.AskLastIDs())
+        # ask the last IDs to perform the recovery
+        conn.ask(Packets.AskLastIDs())
 
     def answerLastIDs(self, conn, loid, ltid, lptid):
         # Get max values.
@@ -132,13 +120,25 @@ class RecoveryManager(MasterHandler):
             # something newer
             self.target_ptid = lptid
             conn.ask(Packets.AskPartitionTable())
+        else:
+            node = self.app.nm.getByUUID(conn.getUUID())
+            assert node.isPending()
+            node.setRunning()
+            self.app.broadcastNodesInformation([node])
 
     def answerPartitionTable(self, conn, ptid, row_list):
+        node = self.app.nm.getByUUID(conn.getUUID())
+        assert node.isPending()
+        node.setRunning()
         if ptid != self.target_ptid:
             # If this is not from a target node, ignore it.
             neo.lib.logging.warn('Got %s while waiting %s', dump(ptid),
                     dump(self.target_ptid))
-            return
+        else:
+            self._broadcastPartitionTable(ptid, row_list)
+        self.app.broadcastNodesInformation([node])
+
+    def _broadcastPartitionTable(self, ptid, row_list):
         try:
             new_nodes = self.app.pt.load(ptid, row_list, self.app.nm)
         except IndexError:
@@ -152,4 +152,3 @@ class RecoveryManager(MasterHandler):
             for node in self.app.nm.getAdminList(only_identified=True):
                 node.notify(notification)
                 node.notify(partition_table)
-

Modified: trunk/neo/tests/master/testRecovery.py
==============================================================================
--- trunk/neo/tests/master/testRecovery.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testRecovery.py [iso-8859-1] Thu Sep  8 19:51:13 2011
@@ -97,12 +97,14 @@ class MasterRecoveryTests(NeoUnitTestBas
         # not from target node, ignore
         uuid = self.identifyToMasterNode(NodeTypes.STORAGE, port=self.storage_port)
         conn = self.getFakeConnection(uuid, self.storage_port)
+        node = self.app.nm.getByUUID(conn.getUUID())
         offset = 1
         cell_list = [(offset, uuid, CellStates.UP_TO_DATE)]
         cells = self.app.pt.getRow(offset)
         for cell, state in cells:
             self.assertEqual(state, CellStates.OUT_OF_DATE)
         recovery.target_ptid = 2
+        node.setPending()
         recovery.answerPartitionTable(conn, 1, cell_list)
         cells = self.app.pt.getRow(offset)
         for cell, state in cells:
@@ -114,6 +116,7 @@ class MasterRecoveryTests(NeoUnitTestBas
         cells = self.app.pt.getRow(offset)
         for cell, state in cells:
             self.assertEqual(state, CellStates.OUT_OF_DATE)
+        node.setPending()
         recovery.answerPartitionTable(conn, None, cell_list)
         cells = self.app.pt.getRow(offset)
         for cell, state in cells:
@@ -124,6 +127,7 @@ class MasterRecoveryTests(NeoUnitTestBas
         offset = 1000000
         self.assertFalse(self.app.pt.hasOffset(offset))
         cell_list = [(offset, ((uuid, NodeStates.DOWN,),),)]
+        node.setPending()
         self.checkProtocolErrorRaised(recovery.answerPartitionTable, conn,
             2, cell_list)
 

Modified: trunk/neo/tests/threaded/test.py
==============================================================================
--- trunk/neo/tests/threaded/test.py [iso-8859-1] (original)
+++ trunk/neo/tests/threaded/test.py [iso-8859-1] Thu Sep  8 19:51:13 2011
@@ -109,8 +109,7 @@ class Test(NEOThreadedTest):
         cluster.reset()
         try:
             cluster.start(storage_list=(s1,), fast_startup=fast_startup)
-            self.assertEqual((NodeStates.UNKNOWN, None)[fast_startup],
-                             cluster.getNodeState(s2))
+            self.assertEqual(NodeStates.UNKNOWN, cluster.getNodeState(s2))
         finally:
             cluster.stop()
 
@@ -140,10 +139,4 @@ class Test(NEOThreadedTest):
             cluster.stop()
 
     def testVerificationCommitUnfinishedTransactionsFastStartup(self):
-        # XXX: This test fails because if the admin starts the cluster without
-        #      any storage node, the master (which is still in recovery stage)
-        #      does not handle properly incoming non-empty storage nodes.
-        #      In particular, it does not ask the last ids to the storage,
-        #      and the client will ask objects at tid 0.
-        #      See also RecoveryManager.identifyStorageNode
         self.testVerificationCommitUnfinishedTransactions(True)




More information about the Neo-report mailing list