[Neo-report] r2180 gregory - in /trunk/neo: master/ master/handlers/ storage/ storage/hand...
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Jun 21 17:53:51 CEST 2010
Author: gregory
Date: Mon Jun 21 17:53:41 2010
New Revision: 2180
Log:
Move stored OIDs check to master side.
- The storages no more check the last OID during a store
- The storages inconditionnaly store the last OID notified by the master
- The master check during the if a greater oid was used by a client
- The master always notify the last OID when a pool is generated or if the
check above is True
- The master's transaction manager manager the last oid and oid generator
Modified:
trunk/neo/master/app.py
trunk/neo/master/handlers/client.py
trunk/neo/master/handlers/storage.py
trunk/neo/master/recovery.py
trunk/neo/master/transactions.py
trunk/neo/storage/handlers/__init__.py
trunk/neo/storage/handlers/initialization.py
trunk/neo/storage/transactions.py
trunk/neo/tests/__init__.py
trunk/neo/tests/functional/testClient.py
trunk/neo/tests/master/testClientHandler.py
trunk/neo/tests/master/testMasterApp.py
trunk/neo/tests/master/testRecovery.py
trunk/neo/tests/master/testStorageHandler.py
trunk/neo/tests/master/testTransactions.py
Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -18,7 +18,6 @@
from neo import logging
import os, sys
from time import time
-from struct import pack, unpack
from neo import protocol
from neo.protocol import UUID_NAMESPACES
@@ -84,9 +83,6 @@ class Application(object):
uuid = self.getNewUUID(NodeTypes.MASTER)
self.uuid = uuid
- # The last OID.
- self.loid = None
-
# election related data
self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set()
@@ -303,7 +299,8 @@ class Application(object):
" Outdate cell of non-working nodes and broadcast changes """
self.broadcastPartitionChanges(self.pt.outdate())
- def broadcastLastOID(self, oid):
+ def broadcastLastOID(self):
+ oid = self.tm.getLastOID()
logging.debug('Broadcast last OID to storages : %s' % dump(oid))
packet = Packets.NotifyLastOID(oid)
for node in self.nm.getIdentifiedList():
@@ -456,15 +453,6 @@ class Application(object):
handler.connectionCompleted(conn)
self.cluster_state = state
- def getNewOIDList(self, num_oids):
- if self.loid is None:
- raise RuntimeError, 'I do not know the last OID'
- oid = unpack('!Q', self.loid)[0] + 1
- oid_list = [pack('!Q', oid + i) for i in xrange(num_oids)]
- self.loid = oid_list[-1]
- self.broadcastLastOID(self.loid)
- return oid_list
-
def getNewUUID(self, node_type):
# build an UUID
uuid = os.urandom(15)
Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -56,8 +56,8 @@ class ClientServiceHandler(MasterHandler
conn.answer(Packets.AnswerBeginTransaction(tid))
def askNewOIDs(self, conn, num_oids):
- oid_list = self.app.getNewOIDList(num_oids)
- conn.answer(Packets.AnswerNewOIDs(oid_list))
+ conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids)))
+ self.app.broadcastLastOID()
def askFinishTransaction(self, conn, tid, oid_list):
app = self.app
@@ -78,6 +78,10 @@ class ClientServiceHandler(MasterHandler
uuid_set.update((cell.getUUID() for cell in app.pt.getCellList(part)
if cell.getNodeState() != NodeStates.HIDDEN))
+ # check if greater and foreign OID was stored
+ if self.app.tm.updateLastOID(oid_list):
+ self.app.broadcastLastOID()
+
# Request locking data.
# build a new set as we may not send the message to all nodes as some
# might be not reachable at that time
Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -48,8 +48,9 @@ class StorageServiceHandler(BaseServiceH
def askLastIDs(self, conn):
app = self.app
- conn.answer(Packets.AnswerLastIDs(app.loid, app.tm.getLastTID(),
- app.pt.getID()))
+ loid = app.tm.getLastOID()
+ ltid = app.tm.getLastTID()
+ conn.answer(Packets.AnswerLastIDs(loid, ltid, app.pt.getID()))
def askUnfinishedTransactions(self, conn):
p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList())
Modified: trunk/neo/master/recovery.py
==============================================================================
--- trunk/neo/master/recovery.py [iso-8859-1] (original)
+++ trunk/neo/master/recovery.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -59,7 +59,7 @@ class RecoveryManager(MasterHandler):
self.app.changeClusterState(ClusterStates.RECOVERING)
em = self.app.em
- self.app.loid = None
+ self.app.tm.setLastOID(None)
self.app.pt.setID(None)
# collect the last partition table available
@@ -81,7 +81,7 @@ class RecoveryManager(MasterHandler):
self.app.broadcastNodesInformation(refused_node_set)
logging.debug('cluster starts with loid=%s and this partition table :',
- dump(self.app.loid))
+ dump(self.app.tm.getLastOID()))
self.app.pt.log()
def buildFromScratch(self):
@@ -96,7 +96,7 @@ class RecoveryManager(MasterHandler):
node.setRunning()
self.app.broadcastNodesInformation(node_list)
# resert IDs generators
- self.app.loid = '\0' * 8
+ self.app.tm.setLastOID('\0' * 8)
# build the partition with this node
pt.setID(pack('!Q', 1))
pt.make(node_list)
@@ -115,13 +115,9 @@ class RecoveryManager(MasterHandler):
conn.ask(Packets.AskLastIDs())
def answerLastIDs(self, conn, loid, ltid, lptid):
- app = self.app
# Get max values.
if loid is not None:
- if app.loid is None:
- app.loid = loid
- else:
- app.loid = max(loid, app.loid)
+ self.app.tm.setLastOID(max(loid, self.app.tm.getLastOID()))
if ltid is not None:
self.app.tm.setLastTID(ltid)
if lptid > self.target_ptid:
@@ -130,7 +126,6 @@ class RecoveryManager(MasterHandler):
conn.ask(Packets.AskPartitionTable())
def answerPartitionTable(self, conn, ptid, row_list):
- app = self.app
if ptid != self.target_ptid:
# If this is not from a target node, ignore it.
logging.warn('Got %s while waiting %s', dump(ptid),
Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -19,7 +19,6 @@ from time import time, gmtime
from struct import pack, unpack
from neo.util import dump
from neo import logging
-from neo import protocol
class Transaction(object):
"""
@@ -127,6 +126,7 @@ class TransactionManager(object):
# node -> transactions mapping
self._node_dict = {}
self._last_tid = None
+ self._last_oid = None
def __getitem__(self, tid):
"""
@@ -143,6 +143,32 @@ class TransactionManager(object):
def items(self):
return self._tid_dict.items()
+ def getNextOIDList(self, num_oids):
+ """ Generate a new OID list """
+ if self._last_oid is None:
+ raise RuntimeError, 'I do not know the last OID'
+ oid = unpack('!Q', self._last_oid)[0] + 1
+ oid_list = [pack('!Q', oid + i) for i in xrange(num_oids)]
+ self._last_oid = oid_list[-1]
+ return oid_list
+
+ def updateLastOID(self, oid_list):
+ """
+ Updates the last oid with the max of those supplied if greater than
+ the current known, returns True if changed
+ """
+ max_oid = oid_list and max(oid_list) or None # oid_list might be empty
+ if max_oid > self._last_oid:
+ self._last_oid = max_oid
+ return True
+ return False
+
+ def setLastOID(self, oid):
+ self._last_oid = oid
+
+ def getLastOID(self):
+ return self._last_oid
+
def _nextTID(self):
""" Compute the next TID based on the current time and check collisions """
tm = time()
Modified: trunk/neo/storage/handlers/__init__.py
==============================================================================
--- trunk/neo/storage/handlers/__init__.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/__init__.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -39,7 +39,6 @@ class BaseMasterHandler(EventHandler):
self.__class__.__name__)
def notifyLastOID(self, conn, oid):
- self.app.tm.setLastOID(oid)
self.app.dm.setLastOID(oid)
def notifyNodeInformation(self, conn, node_list):
Modified: trunk/neo/storage/handlers/initialization.py
==============================================================================
--- trunk/neo/storage/handlers/initialization.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/initialization.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -56,7 +56,6 @@ class InitializationHandler(BaseMasterHa
self.app.has_partition_table = True
def answerLastIDs(self, conn, loid, ltid, lptid):
- self.app.tm.setLastOID(loid)
self.app.dm.setLastOID(loid)
self.app.has_last_ids = True
Modified: trunk/neo/storage/transactions.py
==============================================================================
--- trunk/neo/storage/transactions.py [iso-8859-1] (original)
+++ trunk/neo/storage/transactions.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -114,8 +114,6 @@ class TransactionManager(object):
self._store_lock_dict = {}
self._load_lock_dict = {}
self._uuid_dict = {}
- self._loid = None
- self._loid_seen = None
def __contains__(self, tid):
"""
@@ -144,10 +142,6 @@ class TransactionManager(object):
result = result.getObject(oid)
return result
- def setLastOID(self, oid):
- assert oid >= self._loid
- self._loid = oid
-
def reset(self):
"""
Reset the transaction manager
@@ -186,13 +180,6 @@ class TransactionManager(object):
self._app.dm.finishTransaction(tid)
self.abort(tid, even_if_locked=True)
- # update loid if needed
- if self._loid_seen > self._loid:
- args = dump(self._loid_seen), dump(self._loid)
- logging.warning('Greater OID used in StoreObject : %s > %s', *args)
- self._loid = self._loid_seen
- self._app.dm.setLastOID(self._loid)
-
def storeTransaction(self, tid, oid_list, user, desc, ext, packed):
"""
Store transaction information received from client node
@@ -240,9 +227,6 @@ class TransactionManager(object):
transaction = self._transaction_dict[tid]
transaction.addObject(oid, compression, checksum, data, value_serial)
- # update loid
- self._loid_seen = oid
-
def abort(self, tid, even_if_locked=True):
"""
Abort a transaction
Modified: trunk/neo/tests/__init__.py
==============================================================================
--- trunk/neo/tests/__init__.py [iso-8859-1] (original)
+++ trunk/neo/tests/__init__.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -295,6 +295,9 @@ class NeoTestBase(unittest.TestCase):
def checkAbortTransaction(self, conn, **kw):
return self.checkNotifyPacket(conn, Packets.AbortTransaction, **kw)
+ def checkNotifyLastOID(self, conn, **kw):
+ return self.checkNotifyPacket(conn, Packets.NotifyLastOID, **kw)
+
def checkAnswerTransactionFinished(self, conn, **kw):
return self.checkAnswerPacket(conn, Packets.AnswerTransactionFinished, **kw)
Modified: trunk/neo/tests/functional/testClient.py
==============================================================================
--- trunk/neo/tests/functional/testClient.py [iso-8859-1] (original)
+++ trunk/neo/tests/functional/testClient.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -19,6 +19,7 @@ import os
import unittest
import transaction
import ZODB
+from struct import pack, unpack
from ZODB.FileStorage import FileStorage
from ZODB.POSException import ConflictError
from ZODB.tests.StorageTestBase import zodb_pickle
@@ -300,6 +301,31 @@ class ClientTests(NEOFunctionalTest):
st3.tpc_finish(t3)
self.runWithTimeout(10, test)
+ def testGreaterOIDSaved(self):
+ """
+ Store an object with an OID greater than the last generated by the
+ master. This OID must be intercepted at commit, used for next OID
+ generations and persistently saved on storage nodes.
+ """
+ self.neo = NEOCluster(['test_neo1'], replicas=0,
+ temp_dir=self.getTempDirectory())
+ neoctl = self.neo.getNEOCTL()
+ self.neo.start()
+ db1, conn1 = self.neo.getZODBConnection()
+ st1 = conn1._storage
+ t1 = transaction.Transaction()
+ rev = '\0' * 8
+ data = zodb_pickle(PObject())
+ my_oid = pack('!Q', 100000)
+ # store an object with this OID
+ st1.tpc_begin(t1)
+ st1.store(my_oid, rev, data, '', t1)
+ st1.tpc_vote(t1)
+ st1.tpc_finish(t1)
+ # request an oid, should be greater than mine
+ oid = st1.new_oid()
+ self.assertTrue(oid > my_oid)
+
def test_suite():
return unittest.makeSuite(ClientTests)
Modified: trunk/neo/tests/master/testClientHandler.py
==============================================================================
--- trunk/neo/tests/master/testClientHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testClientHandler.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -80,12 +80,19 @@ class MasterClientHandlerTests(NeoTestBa
def test_08_askNewOIDs(self):
service = self.service
- loid = self.app.loid
+ oid1, oid2 = self.getOID(1), self.getOID(2)
+ self.app.tm.setLastOID(oid1)
# client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
+ for node in self.app.nm.getStorageList():
+ conn = self.getFakeConnection(node.getUUID(), node.getAddress())
+ node.setConnection(conn)
service.askNewOIDs(conn, 1)
- self.assertTrue(loid < self.app.loid)
+ self.assertTrue(self.app.tm.getLastOID() > oid1)
+ for node in self.app.nm.getStorageList():
+ conn = node.getConnection()
+ self.assertEquals(self.checkNotifyLastOID(conn, decode=True), (oid2,))
def test_09_askFinishTransaction(self):
service = self.service
Modified: trunk/neo/tests/master/testMasterApp.py
==============================================================================
--- trunk/neo/tests/master/testMasterApp.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testMasterApp.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -32,21 +32,6 @@ class MasterAppTests(NeoTestBase):
def tearDown(self):
NeoTestBase.tearDown(self)
- def test_05_getNewOIDList(self):
- # must raise as we don"t have one
- self.assertEqual(self.app.loid, None)
- self.app.loid = None
- self.assertRaises(RuntimeError, self.app.getNewOIDList, 1)
- # ask list
- self.app.loid = p64(1)
- oid_list = self.app.getNewOIDList(15)
- self.assertEqual(len(oid_list), 15)
- i = 2
- # begin from 0, so generated oid from 1 to 15
- for oid in oid_list:
- self.assertEqual(u64(oid), i)
- i+=1
-
def test_06_broadcastNodeInformation(self):
# defined some nodes to which data will be send
master_uuid = self.getNewUUID()
Modified: trunk/neo/tests/master/testRecovery.py
==============================================================================
--- trunk/neo/tests/master/testRecovery.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testRecovery.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -93,26 +93,24 @@ class MasterRecoveryTests(NeoTestBase):
def test_09_answerLastIDs(self):
recovery = self.recovery
uuid = self.identifyToMasterNode()
- loid = self.app.loid = '\1' * 8
- self.app.tm.setLastTID('\1' * 8)
- ltid = self.app.tm.getLastTID()
- self.app.pt.setID('\1' * 8)
- lptid = self.app.pt.getID()
+ oid1 = self.getOID(1)
+ oid2 = self.getOID(2)
+ tid1 = self.getNextTID()
+ tid2 = self.getNextTID(tid1)
+ ptid1 = self.getPTID(1)
+ ptid2 = self.getPTID(2)
+ self.app.tm.setLastOID(oid1)
+ self.app.tm.setLastTID(tid1)
+ self.app.pt.setID(ptid1)
# send information which are later to what PMN knows, this must update target node
conn = self.getFakeConnection(uuid, self.storage_port)
- new_ptid = unpack('!Q', lptid)[0]
- new_ptid = pack('!Q', new_ptid + 1)
- oid = unpack('!Q', loid)[0]
- new_oid = pack('!Q', oid + 1)
- upper, lower = unpack('!LL', ltid)
- new_tid = pack('!LL', upper, lower + 10)
- self.assertTrue(new_ptid > self.app.pt.getID())
- self.assertTrue(new_oid > self.app.loid)
- self.assertTrue(new_tid > self.app.tm.getLastTID())
- recovery.answerLastIDs(conn, new_oid, new_tid, new_ptid)
- self.assertEquals(new_oid, self.app.loid)
- self.assertEquals(new_tid, self.app.tm.getLastTID())
- self.assertEquals(new_ptid, recovery.target_ptid)
+ self.assertTrue(ptid2 > self.app.pt.getID())
+ self.assertTrue(oid2 > self.app.tm.getLastOID())
+ self.assertTrue(tid2 > self.app.tm.getLastTID())
+ recovery.answerLastIDs(conn, oid2, tid2, ptid2)
+ self.assertEquals(oid2, self.app.tm.getLastOID())
+ self.assertEquals(tid2, self.app.tm.getLastTID())
+ self.assertEquals(ptid2, recovery.target_ptid)
def test_10_answerPartitionTable(self):
Modified: trunk/neo/tests/master/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testStorageHandler.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -126,8 +126,9 @@ class MasterStorageHandlerTests(NeoTestB
# give a uuid
conn = self.getFakeConnection(node.getUUID(), self.storage_address)
ptid = self.app.pt.getID()
- oid = self.app.loid = '\1' * 8
- tid = '\1' * 8
+ oid = self.getOID(1)
+ tid = self.getNextTID()
+ self.app.tm.setLastOID(oid)
self.app.tm.setLastTID(tid)
service.askLastIDs(conn)
packet = self.checkAnswerLastIDs(conn)
@@ -136,7 +137,6 @@ class MasterStorageHandlerTests(NeoTestB
self.assertEqual(ltid, tid)
self.assertEqual(lptid, ptid)
-
def test_13_askUnfinishedTransactions(self):
service = self.service
node, conn = self.identifyToMasterNode()
Modified: trunk/neo/tests/master/testTransactions.py
==============================================================================
--- trunk/neo/tests/master/testTransactions.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testTransactions.py [iso-8859-1] Mon Jun 21 17:53:41 2010
@@ -101,6 +101,19 @@ class testTransactionManager(NeoTestBase
self.assertEqual(txnman.getPendingList(), [])
self.assertFalse(txnman.hasPending())
+ def test_getNextOIDList(self):
+ txnman = TransactionManager()
+ # must raise as we don"t have one
+ self.assertEqual(txnman.getLastOID(), None)
+ self.assertRaises(RuntimeError, txnman.getNextOIDList, 1)
+ # ask list
+ txnman.setLastOID(self.getOID(1))
+ oid_list = txnman.getNextOIDList(15)
+ self.assertEqual(len(oid_list), 15)
+ # begin from 1, so generated oid from 2 to 16
+ for i, oid in zip(xrange(len(oid_list)), oid_list):
+ self.assertEqual(oid, self.getOID(i+2))
+
def test_getNextTID(self):
txnman = TransactionManager()
# no previous TID
More information about the Neo-report
mailing list