[Neo-report] r2515 vincent - in /trunk/neo: ./ client/ client/handlers/ storage/ storage/h...
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Dec 10 18:08:08 CET 2010
Author: vincent
Date: Fri Dec 10 18:08:08 2010
New Revision: 2515
Log:
Provide ReadVerifyingStorage.
Also, fix bogus checkCurrentSerialInTransaction implementation (it was not
properly locking object, allowing them to become non-current by the time
tpc_finish occurs).
Modified:
trunk/neo/client/Storage.py
trunk/neo/client/app.py
trunk/neo/client/handlers/storage.py
trunk/neo/handler.py
trunk/neo/protocol.py
trunk/neo/storage/handlers/client.py
trunk/neo/storage/transactions.py
trunk/neo/tests/testProtocol.py
Modified: trunk/neo/client/Storage.py
==============================================================================
--- trunk/neo/client/Storage.py [iso-8859-1] (original)
+++ trunk/neo/client/Storage.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -53,6 +53,7 @@ class Storage(BaseStorage.BaseStorage,
# ZODB.interfaces.IStorageIteration,
ZODB.interfaces.IStorageUndoable,
ZODB.interfaces.IExternalGC,
+ ZODB.interfaces.ReadVerifyingStorage,
)
def __init__(self, master_nodes, name, connector=None, read_only=False,
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -702,33 +702,40 @@ class Application(object):
data = data_dict[oid]
tid = local_var.tid
resolved = False
- if conflict_serial <= tid:
- new_data = tryToResolveConflict(oid, conflict_serial, serial,
- data)
- if new_data is not None:
- neo.logging.info('Conflict resolution succeed for ' \
- '%r:%r with %r', dump(oid), dump(serial),
- dump(conflict_serial))
- # Mark this conflict as resolved
- resolved_serial_set.update(conflict_serial_dict.pop(oid))
- # Try to store again
- self._store(oid, conflict_serial, new_data)
- append(oid)
- resolved = True
+ if data is not None:
+ if conflict_serial <= tid:
+ new_data = tryToResolveConflict(oid, conflict_serial,
+ serial, data)
+ if new_data is not None:
+ neo.logging.info('Conflict resolution succeed for ' \
+ '%r:%r with %r', dump(oid), dump(serial),
+ dump(conflict_serial))
+ # Mark this conflict as resolved
+ resolved_serial_set.update(conflict_serial_dict.pop(
+ oid))
+ # Try to store again
+ self._store(oid, conflict_serial, new_data)
+ append(oid)
+ resolved = True
+ else:
+ neo.logging.info('Conflict resolution failed for ' \
+ '%r:%r with %r', dump(oid), dump(serial),
+ dump(conflict_serial))
else:
- neo.logging.info('Conflict resolution failed for ' \
- '%r:%r with %r', dump(oid), dump(serial),
- dump(conflict_serial))
- else:
- neo.logging.info('Conflict reported for %r:%r with later ' \
- 'transaction %r , cannot resolve conflict.', dump(oid),
- dump(serial), dump(conflict_serial))
+ neo.logging.info('Conflict reported for %r:%r with ' \
+ 'later transaction %r , cannot resolve conflict.',
+ dump(oid), dump(serial), dump(conflict_serial))
if not resolved:
# XXX: Is it really required to remove from data_dict ?
del data_dict[oid]
local_var.data_list.remove(oid)
- raise ConflictError(oid=oid,
- serials=(tid, serial), data=data)
+ if data is None:
+ exc = ReadConflictError(oid=oid, serials=(conflict_serial,
+ serial))
+ else:
+ exc = ConflictError(oid=oid, serials=(tid, serial),
+ data=data)
+ raise exc
return result
@profiler_decorator
@@ -1252,9 +1259,31 @@ class Application(object):
return self._load(oid)[1]
def checkCurrentSerialInTransaction(self, oid, serial, transaction):
- if transaction is not self.local_var.txn:
+ local_var = self.local_var
+ if transaction is not local_var.txn:
raise StorageTransactionError(self, transaction)
- committed_tid = self.getLastTID(oid)
- if committed_tid != serial:
- raise ReadConflictError(oid=oid, serials=(committed_tid, serial))
+ cell_list = self._getCellListForOID(oid, writable=True)
+ if len(cell_list) == 0:
+ raise NEOStorageError
+ p = Packets.AskCheckCurrentSerial(local_var.tid, serial, oid)
+ getConnForCell = self.cp.getConnForCell
+ queue = local_var.queue
+ local_var.object_serial_dict[oid] = serial
+ # Placeholders
+ local_var.object_stored_counter_dict[oid] = {}
+ data_dict = local_var.data_dict
+ if oid not in data_dict:
+ # Marker value so we don't try to resolve conflicts.
+ data_dict[oid] = None
+ local_var.data_list.append(oid)
+ for cell in cell_list:
+ conn = getConnForCell(cell)
+ if conn is None:
+ continue
+ try:
+ conn.ask(p, queue=queue)
+ except ConnectionClosed:
+ continue
+
+ self._waitAnyMessage(False)
Modified: trunk/neo/client/handlers/storage.py
==============================================================================
--- trunk/neo/client/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/storage.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -89,6 +89,8 @@ class StorageAnswersHandler(AnswerBaseHa
object_stored_counter_dict[serial] = \
object_stored_counter_dict.get(serial, 0) + 1
+ answerCheckCurrentSerial = answerStoreObject
+
def answerStoreTransaction(self, conn, tid):
if tid != self.app.getTID():
raise ProtocolError('Wrong TID, transaction not started')
Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -372,6 +372,11 @@ class EventHandler(object):
def answerLastTransaction(self, conn, tid):
raise UnexpectedPacketError
+ def askCheckCurrentSerial(self, conn, tid, serial, oid):
+ raise UnexpectedPacketError
+
+ answerCheckCurrentSerial = answerStoreObject
+
# Error packet handlers.
def error(self, conn, code, message):
@@ -492,6 +497,8 @@ class EventHandler(object):
d[Packets.NotifyReady] = self.notifyReady
d[Packets.AskLastTransaction] = self.askLastTransaction
d[Packets.AnswerLastTransaction] = self.answerLastTransaction
+ d[Packets.AskCheckCurrentSerial] = self.askCheckCurrentSerial
+ d[Packets.AnswerCheckCurrentSerial] = self.answerCheckCurrentSerial
return d
Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -1591,6 +1591,28 @@ class AnswerHasLock(Packet):
oid, state = unpack(self._header_format, body)
return (oid, _decodeLockState(state))
+class AskCheckCurrentSerial(Packet):
+ """
+ Verifies if given serial is current for object oid in the database, and
+ take a write lock on it (so that this state is not altered until
+ transaction ends).
+ """
+ _header_format = '!8s8s8s'
+
+ def _encode(self, tid, serial, oid):
+ return tid + serial + oid
+
+ def _decode(self, body):
+ return unpack(self._header_format, body)
+
+class AnswerCheckCurrentSerial(AnswerStoreObject):
+ """
+ Answer to AskCheckCurrentSerial.
+ Same structure as AnswerStoreObject, to handle the same way, except there
+ is nothing to invalidate in any client's cache.
+ """
+ pass
+
class AskBarrier(Packet):
"""
Initates a "network barrier", allowing the node sending this packet to know
@@ -1993,6 +2015,11 @@ class PacketRegistry(dict):
AskLastTransaction,
AnswerLastTransaction,
)
+ AskCheckCurrentSerial, AnswerCheckCurrentSerial = register(
+ 0x003D,
+ AskCheckCurrentSerial,
+ AnswerCheckCurrentSerial,
+ )
# build a "singleton"
Packets = PacketRegistry()
Modified: trunk/neo/storage/handlers/client.py
==============================================================================
--- trunk/neo/storage/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/client.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -139,3 +139,33 @@ class ClientOperationHandler(BaseClientA
history_list = []
conn.answer(Packets.AnswerObjectHistory(oid, history_list))
+ def askCheckCurrentSerial(self, conn, tid, serial, oid):
+ self._askCheckCurrentSerial(conn, tid, serial, oid, time.time())
+
+ def _askCheckCurrentSerial(self, conn, tid, serial, oid, request_time):
+ if tid not in self.app.tm:
+ # transaction was aborted, cancel this event
+ neo.logging.info('Forget serial check of %s:%s by %s delayed by '
+ '%s', dump(oid), dump(serial), dump(tid),
+ dump(self.app.tm.getLockingTID(oid)))
+ # send an answer as the client side is waiting for it
+ conn.answer(Packets.AnswerStoreObject(0, oid, serial))
+ return
+ try:
+ self.app.tm.checkCurrentSerial(tid, serial, oid)
+ except ConflictError, err:
+ # resolvable or not
+ conn.answer(Packets.AnswerCheckCurrentSerial(1, oid,
+ err.getTID()))
+ except DelayedError:
+ # locked by a previous transaction, retry later
+ self.app.queueEvent(self._askCheckCurrentSerial, conn, tid, serial,
+ oid, request_time)
+ else:
+ if SLOW_STORE is not None:
+ duration = time.time() - request_time
+ if duration > SLOW_STORE:
+ neo.logging.info('CheckCurrentSerial delay: %.02fs',
+ duration)
+ conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial))
+
Modified: trunk/neo/storage/transactions.py
==============================================================================
--- trunk/neo/storage/transactions.py [iso-8859-1] (original)
+++ trunk/neo/storage/transactions.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -52,6 +52,7 @@ class Transaction(object):
self._transaction = None
self._locked = False
self._birth = time()
+ self._checked_set = set()
def __repr__(self):
return "<%s(tid=%r, uuid=%r, locked=%r, age=%.2fs)> at %x" % (
@@ -63,6 +64,9 @@ class Transaction(object):
id(self),
)
+ def addCheckedObject(self, oid):
+ self._checked_set.add(oid)
+
def getTID(self):
return self._tid
@@ -99,6 +103,9 @@ class Transaction(object):
def getOIDList(self):
return self._object_dict.keys()
+ def getLockedOIDList(self):
+ return self._object_dict.keys() + list(self._checked_set)
+
def getTransactionInformations(self):
return self._transaction
@@ -191,10 +198,13 @@ class TransactionManager(object):
def getLockingTID(self, oid):
return self._store_lock_dict.get(oid)
- def storeObject(self, tid, serial, oid, compression, checksum, data,
- value_serial):
+ def lockObject(self, tid, serial, oid):
"""
- Store an object received from client node
+ Take a write lock on given object, checking that "serial" is
+ current.
+ Raises:
+ DelayedError
+ ConflictError
"""
# check if the object if locked
locking_tid = self._store_lock_dict.get(oid)
@@ -222,6 +232,18 @@ class TransactionManager(object):
dump(oid), dump(tid), dump(locking_tid))
raise ConflictError(locking_tid)
+ def checkCurrentSerial(self, tid, serial, oid):
+ self.lockObject(tid, serial, oid)
+ assert tid in self, "Transaction not registered"
+ transaction = self._transaction_dict[tid]
+ transaction.addCheckedObject(oid)
+
+ def storeObject(self, tid, serial, oid, compression, checksum, data,
+ value_serial):
+ """
+ Store an object received from client node
+ """
+ self.lockObject(tid, serial, oid)
# store object
assert tid in self, "Transaction not registered"
transaction = self._transaction_dict[tid]
@@ -245,12 +267,12 @@ class TransactionManager(object):
if not even_if_locked and has_load_lock:
return
# unlock any object
- for oid in transaction.getOIDList():
+ for oid in transaction.getLockedOIDList():
if has_load_lock:
- lock_tid = self._load_lock_dict.pop(oid)
- assert lock_tid == tid, 'Transaction %s tried to release ' \
- 'the lock on oid %s, but it was held by %s' % (dump(tid),
- dump(oid), dump(lock_tid))
+ lock_tid = self._load_lock_dict.pop(oid, None)
+ assert lock_tid in (tid, None), 'Transaction %s tried to ' \
+ 'release the lock on oid %s, but it was held by %s' % (
+ dump(tid), dump(oid), dump(lock_tid))
del self._store_lock_dict[oid]
# remove the transaction
uuid = transaction.getUUID()
Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -702,6 +702,16 @@ class ProtocolTests(NeoUnitTestBase):
ptid = p.decode()[0]
self.assertEqual(ptid, tid)
+ def test_AskCheckCurrentSerial(self):
+ tid = self.getNextTID()
+ serial = self.getNextTID()
+ oid = self.getNextTID()
+ p = Packets.AskCheckCurrentSerial(tid, serial, oid)
+ ptid, pserial, poid = p.decode()
+ self.assertEqual(ptid, tid)
+ self.assertEqual(pserial, serial)
+ self.assertEqual(poid, oid)
+
if __name__ == '__main__':
unittest.main()
More information about the Neo-report
mailing list