[Neo-report] r2515 vincent - in /trunk/neo: ./ client/ client/handlers/ storage/ storage/h...

nobody at svn.erp5.org nobody at svn.erp5.org
Fri Dec 10 18:08:08 CET 2010


Author: vincent
Date: Fri Dec 10 18:08:08 2010
New Revision: 2515

Log:
Provide ReadVerifyingStorage.

Also, fix bogus checkCurrentSerialInTransaction implementation (it was not
properly locking object, allowing them to become non-current by the time
tpc_finish occurs).

Modified:
    trunk/neo/client/Storage.py
    trunk/neo/client/app.py
    trunk/neo/client/handlers/storage.py
    trunk/neo/handler.py
    trunk/neo/protocol.py
    trunk/neo/storage/handlers/client.py
    trunk/neo/storage/transactions.py
    trunk/neo/tests/testProtocol.py

Modified: trunk/neo/client/Storage.py
==============================================================================
--- trunk/neo/client/Storage.py [iso-8859-1] (original)
+++ trunk/neo/client/Storage.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -53,6 +53,7 @@ class Storage(BaseStorage.BaseStorage,
         # ZODB.interfaces.IStorageIteration,
         ZODB.interfaces.IStorageUndoable,
         ZODB.interfaces.IExternalGC,
+        ZODB.interfaces.ReadVerifyingStorage,
     )
 
     def __init__(self, master_nodes, name, connector=None, read_only=False,

Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -702,33 +702,40 @@ class Application(object):
             data = data_dict[oid]
             tid = local_var.tid
             resolved = False
-            if conflict_serial <= tid:
-                new_data = tryToResolveConflict(oid, conflict_serial, serial,
-                    data)
-                if new_data is not None:
-                    neo.logging.info('Conflict resolution succeed for ' \
-                        '%r:%r with %r', dump(oid), dump(serial),
-                        dump(conflict_serial))
-                    # Mark this conflict as resolved
-                    resolved_serial_set.update(conflict_serial_dict.pop(oid))
-                    # Try to store again
-                    self._store(oid, conflict_serial, new_data)
-                    append(oid)
-                    resolved = True
+            if data is not None:
+                if conflict_serial <= tid:
+                    new_data = tryToResolveConflict(oid, conflict_serial,
+                        serial, data)
+                    if new_data is not None:
+                        neo.logging.info('Conflict resolution succeed for ' \
+                            '%r:%r with %r', dump(oid), dump(serial),
+                            dump(conflict_serial))
+                        # Mark this conflict as resolved
+                        resolved_serial_set.update(conflict_serial_dict.pop(
+                            oid))
+                        # Try to store again
+                        self._store(oid, conflict_serial, new_data)
+                        append(oid)
+                        resolved = True
+                    else:
+                        neo.logging.info('Conflict resolution failed for ' \
+                            '%r:%r with %r', dump(oid), dump(serial),
+                            dump(conflict_serial))
                 else:
-                    neo.logging.info('Conflict resolution failed for ' \
-                        '%r:%r with %r', dump(oid), dump(serial),
-                        dump(conflict_serial))
-            else:
-                neo.logging.info('Conflict reported for %r:%r with later ' \
-                    'transaction %r , cannot resolve conflict.', dump(oid),
-                    dump(serial), dump(conflict_serial))
+                    neo.logging.info('Conflict reported for %r:%r with ' \
+                        'later transaction %r , cannot resolve conflict.',
+                        dump(oid), dump(serial), dump(conflict_serial))
             if not resolved:
                 # XXX: Is it really required to remove from data_dict ?
                 del data_dict[oid]
                 local_var.data_list.remove(oid)
-                raise ConflictError(oid=oid,
-                    serials=(tid, serial), data=data)
+                if data is None:
+                    exc = ReadConflictError(oid=oid, serials=(conflict_serial,
+                        serial))
+                else:
+                    exc = ConflictError(oid=oid, serials=(tid, serial),
+                        data=data)
+                raise exc
         return result
 
     @profiler_decorator
@@ -1252,9 +1259,31 @@ class Application(object):
         return self._load(oid)[1]
 
     def checkCurrentSerialInTransaction(self, oid, serial, transaction):
-        if transaction is not self.local_var.txn:
+        local_var = self.local_var
+        if transaction is not local_var.txn:
               raise StorageTransactionError(self, transaction)
-        committed_tid = self.getLastTID(oid)
-        if committed_tid != serial:
-            raise ReadConflictError(oid=oid, serials=(committed_tid, serial))
+        cell_list = self._getCellListForOID(oid, writable=True)
+        if len(cell_list) == 0:
+            raise NEOStorageError
+        p = Packets.AskCheckCurrentSerial(local_var.tid, serial, oid)
+        getConnForCell = self.cp.getConnForCell
+        queue = local_var.queue
+        local_var.object_serial_dict[oid] = serial
+        # Placeholders
+        local_var.object_stored_counter_dict[oid] = {}
+        data_dict = local_var.data_dict
+        if oid not in data_dict:
+            # Marker value so we don't try to resolve conflicts.
+            data_dict[oid] = None
+            local_var.data_list.append(oid)
+        for cell in cell_list:
+            conn = getConnForCell(cell)
+            if conn is None:
+                continue
+            try:
+                conn.ask(p, queue=queue)
+            except ConnectionClosed:
+                continue
+
+        self._waitAnyMessage(False)
 

Modified: trunk/neo/client/handlers/storage.py
==============================================================================
--- trunk/neo/client/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/storage.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -89,6 +89,8 @@ class StorageAnswersHandler(AnswerBaseHa
             object_stored_counter_dict[serial] = \
                 object_stored_counter_dict.get(serial, 0) + 1
 
+    answerCheckCurrentSerial = answerStoreObject
+
     def answerStoreTransaction(self, conn, tid):
         if tid != self.app.getTID():
             raise ProtocolError('Wrong TID, transaction not started')

Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -372,6 +372,11 @@ class EventHandler(object):
     def answerLastTransaction(self, conn, tid):
         raise UnexpectedPacketError
 
+    def askCheckCurrentSerial(self, conn, tid, serial, oid):
+        raise UnexpectedPacketError
+
+    answerCheckCurrentSerial = answerStoreObject
+
     # Error packet handlers.
 
     def error(self, conn, code, message):
@@ -492,6 +497,8 @@ class EventHandler(object):
         d[Packets.NotifyReady] = self.notifyReady
         d[Packets.AskLastTransaction] = self.askLastTransaction
         d[Packets.AnswerLastTransaction] = self.answerLastTransaction
+        d[Packets.AskCheckCurrentSerial] = self.askCheckCurrentSerial
+        d[Packets.AnswerCheckCurrentSerial] = self.answerCheckCurrentSerial
 
         return d
 

Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -1591,6 +1591,28 @@ class AnswerHasLock(Packet):
         oid, state = unpack(self._header_format, body)
         return (oid, _decodeLockState(state))
 
+class AskCheckCurrentSerial(Packet):
+    """
+    Verifies if given serial is current for object oid in the database, and
+    take a write lock on it (so that this state is not altered until
+    transaction ends).
+    """
+    _header_format = '!8s8s8s'
+
+    def _encode(self, tid, serial, oid):
+        return tid + serial + oid
+
+    def _decode(self, body):
+        return unpack(self._header_format, body)
+
+class AnswerCheckCurrentSerial(AnswerStoreObject):
+    """
+    Answer to AskCheckCurrentSerial.
+    Same structure as AnswerStoreObject, to handle the same way, except there
+    is nothing to invalidate in any client's cache.
+    """
+    pass
+
 class AskBarrier(Packet):
     """
     Initates a "network barrier", allowing the node sending this packet to know
@@ -1993,6 +2015,11 @@ class PacketRegistry(dict):
             AskLastTransaction,
             AnswerLastTransaction,
             )
+    AskCheckCurrentSerial, AnswerCheckCurrentSerial = register(
+            0x003D,
+            AskCheckCurrentSerial,
+            AnswerCheckCurrentSerial,
+            )
 
 # build a "singleton"
 Packets = PacketRegistry()

Modified: trunk/neo/storage/handlers/client.py
==============================================================================
--- trunk/neo/storage/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/client.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -139,3 +139,33 @@ class ClientOperationHandler(BaseClientA
             history_list = []
         conn.answer(Packets.AnswerObjectHistory(oid, history_list))
 
+    def askCheckCurrentSerial(self, conn, tid, serial, oid):
+        self._askCheckCurrentSerial(conn, tid, serial, oid, time.time())
+
+    def _askCheckCurrentSerial(self, conn, tid, serial, oid, request_time):
+        if tid not in self.app.tm:
+            # transaction was aborted, cancel this event
+            neo.logging.info('Forget serial check of %s:%s by %s delayed by '
+                '%s', dump(oid), dump(serial), dump(tid),
+                dump(self.app.tm.getLockingTID(oid)))
+            # send an answer as the client side is waiting for it
+            conn.answer(Packets.AnswerStoreObject(0, oid, serial))
+            return
+        try:
+            self.app.tm.checkCurrentSerial(tid, serial, oid)
+        except ConflictError, err:
+            # resolvable or not
+            conn.answer(Packets.AnswerCheckCurrentSerial(1, oid,
+                err.getTID()))
+        except DelayedError:
+            # locked by a previous transaction, retry later
+            self.app.queueEvent(self._askCheckCurrentSerial, conn, tid, serial,
+                oid, request_time)
+        else:
+            if SLOW_STORE is not None:
+                duration = time.time() - request_time
+                if duration > SLOW_STORE:
+                    neo.logging.info('CheckCurrentSerial delay: %.02fs',
+                        duration)
+            conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial))
+

Modified: trunk/neo/storage/transactions.py
==============================================================================
--- trunk/neo/storage/transactions.py [iso-8859-1] (original)
+++ trunk/neo/storage/transactions.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -52,6 +52,7 @@ class Transaction(object):
         self._transaction = None
         self._locked = False
         self._birth = time()
+        self._checked_set = set()
 
     def __repr__(self):
         return "<%s(tid=%r, uuid=%r, locked=%r, age=%.2fs)> at %x" % (
@@ -63,6 +64,9 @@ class Transaction(object):
             id(self),
         )
 
+    def addCheckedObject(self, oid):
+        self._checked_set.add(oid)
+
     def getTID(self):
         return self._tid
 
@@ -99,6 +103,9 @@ class Transaction(object):
     def getOIDList(self):
         return self._object_dict.keys()
 
+    def getLockedOIDList(self):
+        return self._object_dict.keys() + list(self._checked_set)
+
     def getTransactionInformations(self):
         return self._transaction
 
@@ -191,10 +198,13 @@ class TransactionManager(object):
     def getLockingTID(self, oid):
         return self._store_lock_dict.get(oid)
 
-    def storeObject(self, tid, serial, oid, compression, checksum, data,
-            value_serial):
+    def lockObject(self, tid, serial, oid):
         """
-            Store an object received from client node
+            Take a write lock on given object, checking that "serial" is
+            current.
+            Raises:
+                DelayedError
+                ConflictError
         """
         # check if the object if locked
         locking_tid = self._store_lock_dict.get(oid)
@@ -222,6 +232,18 @@ class TransactionManager(object):
                 dump(oid), dump(tid), dump(locking_tid))
             raise ConflictError(locking_tid)
 
+    def checkCurrentSerial(self, tid, serial, oid):
+        self.lockObject(tid, serial, oid)
+        assert tid in self, "Transaction not registered"
+        transaction = self._transaction_dict[tid]
+        transaction.addCheckedObject(oid)
+
+    def storeObject(self, tid, serial, oid, compression, checksum, data,
+            value_serial):
+        """
+            Store an object received from client node
+        """
+        self.lockObject(tid, serial, oid)
         # store object
         assert tid in self, "Transaction not registered"
         transaction = self._transaction_dict[tid]
@@ -245,12 +267,12 @@ class TransactionManager(object):
         if not even_if_locked and has_load_lock:
             return
         # unlock any object
-        for oid in transaction.getOIDList():
+        for oid in transaction.getLockedOIDList():
             if has_load_lock:
-                lock_tid = self._load_lock_dict.pop(oid)
-                assert lock_tid == tid, 'Transaction %s tried to release ' \
-                    'the lock on oid %s, but it was held by %s' % (dump(tid),
-                    dump(oid), dump(lock_tid))
+                lock_tid = self._load_lock_dict.pop(oid, None)
+                assert lock_tid in (tid, None), 'Transaction %s tried to ' \
+                    'release the lock on oid %s, but it was held by %s' % (
+                    dump(tid), dump(oid), dump(lock_tid))
             del self._store_lock_dict[oid]
         # remove the transaction
         uuid = transaction.getUUID()

Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Fri Dec 10 18:08:08 2010
@@ -702,6 +702,16 @@ class ProtocolTests(NeoUnitTestBase):
         ptid = p.decode()[0]
         self.assertEqual(ptid, tid)
 
+    def test_AskCheckCurrentSerial(self):
+        tid = self.getNextTID()
+        serial = self.getNextTID()
+        oid = self.getNextTID()
+        p = Packets.AskCheckCurrentSerial(tid, serial, oid)
+        ptid, pserial, poid = p.decode()
+        self.assertEqual(ptid, tid)
+        self.assertEqual(pserial, serial)
+        self.assertEqual(poid, oid)
+
 if __name__ == '__main__':
     unittest.main()
 




More information about the Neo-report mailing list