[Neo-report] r2835 jm - in /trunk/neo: admin/ master/ tests/master/ tests/threaded/
nobody at svn.erp5.org
nobody at svn.erp5.org
Thu Sep 8 19:51:13 CEST 2011
Author: jm
Date: Thu Sep 8 19:51:13 2011
New Revision: 2835
Log:
Fix handling of incoming non-empty storage nodes just after startup is allowed
Modified:
trunk/neo/admin/handler.py
trunk/neo/master/recovery.py
trunk/neo/tests/master/testRecovery.py
trunk/neo/tests/threaded/test.py
Modified: trunk/neo/admin/handler.py
==============================================================================
--- trunk/neo/admin/handler.py [iso-8859-1] (original)
+++ trunk/neo/admin/handler.py [iso-8859-1] Thu Sep 8 19:51:13 2011
@@ -157,11 +157,6 @@ class MasterEventHandler(EventHandler):
def notifyNodeInformation(self, conn, node_list):
app = self.app
app.nm.update(node_list)
- if not app.pt.filled():
- # Re-ask partition table, in case node change filled it.
- # XXX: we should only ask it if received states indicates it is
- # possible (ignore TEMPORARILY_DOWN for example)
- conn.ask(Packets.AskPartitionTable())
class MasterRequestEventHandler(EventHandler):
""" This class handle all answer from primary master node"""
Modified: trunk/neo/master/recovery.py
==============================================================================
--- trunk/neo/master/recovery.py [iso-8859-1] (original)
+++ trunk/neo/master/recovery.py [iso-8859-1] Thu Sep 8 19:51:13 2011
@@ -23,7 +23,6 @@ from neo.lib.protocol import Packets, Pr
from neo.lib.protocol import NotReadyError, ZERO_OID, ZERO_TID
from neo.master.handlers import MasterHandler
-REQUIRED_NODE_NUMBER = 1
class RecoveryManager(MasterHandler):
"""
@@ -42,14 +41,7 @@ class RecoveryManager(MasterHandler):
"""
Returns the handler for storage nodes
"""
- # XXX: Looking at 'uuid' is not a good criteria to know if the storage
- # is empty. Empty node should be accepted here.
- # This is also the first step to fix handling of incoming
- # non-empty storage nodes, whereas startup was already allowed.
- if uuid is None and not self.app._startup_allowed:
- neo.lib.logging.info('reject empty storage node')
- raise NotReadyError
- return (uuid, NodeStates.RUNNING, self)
+ return uuid, NodeStates.PENDING, self
def run(self):
"""
@@ -67,22 +59,37 @@ class RecoveryManager(MasterHandler):
self.app.pt.setID(None)
# collect the last partition table available
- while not self.app._startup_allowed:
+ while 1:
em.poll(1)
+ if self.app._startup_allowed:
+ allowed_node_set = set()
+ for node in self.app.nm.getStorageList():
+ if node.isPending():
+ break # waiting for an answer
+ if node.isRunning():
+ allowed_node_set.add(node)
+ else:
+ if allowed_node_set:
+ break # no ready storage node
neo.lib.logging.info('startup allowed')
- # build a new partition table
if self.app.pt.getID() is None:
- self.buildFromScratch()
+ neo.lib.logging.info('creating a new partition table')
+ # reset IDs generators & build new partition with running nodes
+ self.app.tm.setLastOID(ZERO_OID)
+ self.app.pt.make(allowed_node_set)
+ self._broadcastPartitionTable(self.app.pt.getID(),
+ self.app.pt.getRowList())
# collect node that are connected but not in the selected partition
# table and set them in pending state
- allowed_node_set = set(self.app.pt.getNodeList())
- refused_node_set = set(self.app.nm.getStorageList()) - allowed_node_set
- for node in refused_node_set:
- node.setPending()
- self.app.broadcastNodesInformation(refused_node_set)
+ refused_node_set = allowed_node_set.difference(
+ self.app.pt.getNodeList())
+ if refused_node_set:
+ for node in refused_node_set:
+ node.setPending()
+ self.app.broadcastNodesInformation(refused_node_set)
self.app.setLastTransaction(self.app.tm.getLastTID())
neo.lib.logging.debug(
@@ -90,23 +97,6 @@ class RecoveryManager(MasterHandler):
'table :', dump(self.app.tm.getLastOID()))
self.app.pt.log()
- def buildFromScratch(self):
- nm, em, pt = self.app.nm, self.app.em, self.app.pt
- neo.lib.logging.debug('creating a new partition table, wait for a ' \
- 'storage node')
- # wait for some empty storage nodes, their are accepted
- while len(nm.getStorageList()) < REQUIRED_NODE_NUMBER:
- em.poll(1)
- # take the first node available
- node_list = nm.getStorageList()[:REQUIRED_NODE_NUMBER]
- for node in node_list:
- node.setRunning()
- self.app.broadcastNodesInformation(node_list)
- # resert IDs generators
- self.app.tm.setLastOID(ZERO_OID)
- # build the partition with this node
- pt.make(node_list)
-
def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID())
assert node is not None
@@ -117,10 +107,8 @@ class RecoveryManager(MasterHandler):
self.app.broadcastNodesInformation([node])
def connectionCompleted(self, conn):
- # XXX: handler split review needed to remove this hack
- if not self.app._startup_allowed:
- # ask the last IDs to perform the recovery
- conn.ask(Packets.AskLastIDs())
+ # ask the last IDs to perform the recovery
+ conn.ask(Packets.AskLastIDs())
def answerLastIDs(self, conn, loid, ltid, lptid):
# Get max values.
@@ -132,13 +120,25 @@ class RecoveryManager(MasterHandler):
# something newer
self.target_ptid = lptid
conn.ask(Packets.AskPartitionTable())
+ else:
+ node = self.app.nm.getByUUID(conn.getUUID())
+ assert node.isPending()
+ node.setRunning()
+ self.app.broadcastNodesInformation([node])
def answerPartitionTable(self, conn, ptid, row_list):
+ node = self.app.nm.getByUUID(conn.getUUID())
+ assert node.isPending()
+ node.setRunning()
if ptid != self.target_ptid:
# If this is not from a target node, ignore it.
neo.lib.logging.warn('Got %s while waiting %s', dump(ptid),
dump(self.target_ptid))
- return
+ else:
+ self._broadcastPartitionTable(ptid, row_list)
+ self.app.broadcastNodesInformation([node])
+
+ def _broadcastPartitionTable(self, ptid, row_list):
try:
new_nodes = self.app.pt.load(ptid, row_list, self.app.nm)
except IndexError:
@@ -152,4 +152,3 @@ class RecoveryManager(MasterHandler):
for node in self.app.nm.getAdminList(only_identified=True):
node.notify(notification)
node.notify(partition_table)
-
Modified: trunk/neo/tests/master/testRecovery.py
==============================================================================
--- trunk/neo/tests/master/testRecovery.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testRecovery.py [iso-8859-1] Thu Sep 8 19:51:13 2011
@@ -97,12 +97,14 @@ class MasterRecoveryTests(NeoUnitTestBas
# not from target node, ignore
uuid = self.identifyToMasterNode(NodeTypes.STORAGE, port=self.storage_port)
conn = self.getFakeConnection(uuid, self.storage_port)
+ node = self.app.nm.getByUUID(conn.getUUID())
offset = 1
cell_list = [(offset, uuid, CellStates.UP_TO_DATE)]
cells = self.app.pt.getRow(offset)
for cell, state in cells:
self.assertEqual(state, CellStates.OUT_OF_DATE)
recovery.target_ptid = 2
+ node.setPending()
recovery.answerPartitionTable(conn, 1, cell_list)
cells = self.app.pt.getRow(offset)
for cell, state in cells:
@@ -114,6 +116,7 @@ class MasterRecoveryTests(NeoUnitTestBas
cells = self.app.pt.getRow(offset)
for cell, state in cells:
self.assertEqual(state, CellStates.OUT_OF_DATE)
+ node.setPending()
recovery.answerPartitionTable(conn, None, cell_list)
cells = self.app.pt.getRow(offset)
for cell, state in cells:
@@ -124,6 +127,7 @@ class MasterRecoveryTests(NeoUnitTestBas
offset = 1000000
self.assertFalse(self.app.pt.hasOffset(offset))
cell_list = [(offset, ((uuid, NodeStates.DOWN,),),)]
+ node.setPending()
self.checkProtocolErrorRaised(recovery.answerPartitionTable, conn,
2, cell_list)
Modified: trunk/neo/tests/threaded/test.py
==============================================================================
--- trunk/neo/tests/threaded/test.py [iso-8859-1] (original)
+++ trunk/neo/tests/threaded/test.py [iso-8859-1] Thu Sep 8 19:51:13 2011
@@ -109,8 +109,7 @@ class Test(NEOThreadedTest):
cluster.reset()
try:
cluster.start(storage_list=(s1,), fast_startup=fast_startup)
- self.assertEqual((NodeStates.UNKNOWN, None)[fast_startup],
- cluster.getNodeState(s2))
+ self.assertEqual(NodeStates.UNKNOWN, cluster.getNodeState(s2))
finally:
cluster.stop()
@@ -140,10 +139,4 @@ class Test(NEOThreadedTest):
cluster.stop()
def testVerificationCommitUnfinishedTransactionsFastStartup(self):
- # XXX: This test fails because if the admin starts the cluster without
- # any storage node, the master (which is still in recovery stage)
- # does not handle properly incoming non-empty storage nodes.
- # In particular, it does not ask the last ids to the storage,
- # and the client will ask objects at tid 0.
- # See also RecoveryManager.identifyStorageNode
self.testVerificationCommitUnfinishedTransactions(True)
More information about the Neo-report
mailing list