[Neo-report] r2449 vincent - /trunk/neo/tests/storage/

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Nov 9 20:41:21 CET 2010


Author: vincent
Date: Tue Nov  9 20:41:21 2010
New Revision: 2449

Log:
Split database test into two parts: generic and mysql-specific.

Added:
    trunk/neo/tests/storage/testStorageDBTests.py
      - copied, changed from r2448, trunk/neo/tests/storage/testStorageMySQLdb.py
Modified:
    trunk/neo/tests/storage/testStorageMySQLdb.py

Copied: trunk/neo/tests/storage/testStorageDBTests.py (from r2448, trunk/neo/tests/storage/testStorageMySQLdb.py)
==============================================================================
--- trunk/neo/tests/storage/testStorageMySQLdb.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageDBTests.py [iso-8859-1] Tue Nov  9 20:41:21 2010
@@ -16,7 +16,6 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
 import unittest
-import MySQLdb
 from mock import Mock
 from neo.util import dump, p64, u64
 from neo.protocol import CellStates, ZERO_OID, ZERO_TID, MAX_TID
@@ -24,129 +23,21 @@ from neo.tests import NeoUnitTestBase
 from neo.exception import DatabaseFailure
 from neo.storage.database.mysqldb import MySQLDatabaseManager
 
-NEO_SQL_DATABASE = 'test_mysqldb0'
-NEO_SQL_USER = 'test'
-
-class StorageMySQSLdbTests(NeoUnitTestBase):
+class StorageDBTests(NeoUnitTestBase):
 
     def setUp(self):
         NeoUnitTestBase.setUp(self)
-        self.prepareDatabase(number=1, prefix=NEO_SQL_DATABASE[:-1])
-        # db manager
-        database = '%s@%s' % (NEO_SQL_USER, NEO_SQL_DATABASE)
-        self.db = MySQLDatabaseManager(database)
-        self.db.setup()
-        self.db.setNumPartitions(1)
+        self.db = self.getDB()
 
     def tearDown(self):
-        self.db.close()
+        self.closeDB()
         NeoUnitTestBase.tearDown(self)
 
-    def checkCalledQuery(self, query=None, call=0):
-        self.assertTrue(len(self.db.conn.mockGetNamedCalls('query')) > call)
-        call = self.db.conn.mockGetNamedCalls('query')[call]
-        call.checkArgs('BEGIN')
-
-    def test_MySQLDatabaseManagerInit(self):
-        db = MySQLDatabaseManager('%s@%s' % (NEO_SQL_USER, NEO_SQL_DATABASE))
-        # init
-        self.assertEquals(db.db, NEO_SQL_DATABASE)
-        self.assertEquals(db.user, NEO_SQL_USER)
-        # & connect
-        self.assertTrue(isinstance(db.conn, MySQLdb.connection))
-        self.assertEquals(db.isUnderTransaction(), False)
-
-    def test_begin(self):
-        # no current transaction
-        self.db.conn = Mock({ })
-        self.assertEquals(self.db.isUnderTransaction(), False)
-        self.db.begin()
-        self.checkCalledQuery(query='COMMIT')
-        self.assertEquals(self.db.isUnderTransaction(), True)
-
-    def test_commit(self):
-        self.db.conn = Mock()
-        self.db.begin()
-        self.db.commit()
-        self.assertEquals(len(self.db.conn.mockGetNamedCalls('commit')), 1)
-        self.assertEquals(self.db.isUnderTransaction(), False)
-
-    def test_rollback(self):
-        # rollback called and no current transaction
-        self.db.conn = Mock({ })
-        self.db.under_transaction = True
-        self.db.rollback()
-        self.assertEquals(len(self.db.conn.mockGetNamedCalls('rollback')), 1)
-        self.assertEquals(self.db.isUnderTransaction(), False)
-
-    def test_query1(self):
-        # fake result object
-        from array import array
-        result_object = Mock({
-            "num_rows": 1,
-            "fetch_row": ((1, 2, array('b', (1, 2, ))), ),
-        })
-        # expected formatted result
-        expected_result = (
-            (1, 2, '\x01\x02', ),
-        )
-        self.db.conn = Mock({ 'store_result': result_object })
-        result = self.db.query('QUERY')
-        self.assertEquals(result, expected_result)
-        calls = self.db.conn.mockGetNamedCalls('query')
-        self.assertEquals(len(calls), 1)
-        calls[0].checkArgs('QUERY')
-
-    def test_query2(self):
-        # test the OperationalError exception
-        # fake object, raise exception during the first call
-        from MySQLdb import OperationalError
-        from MySQLdb.constants.CR import SERVER_GONE_ERROR
-        class FakeConn(object):
-            def query(*args):
-                raise OperationalError(SERVER_GONE_ERROR, 'this is a test')
-        self.db.conn = FakeConn()
-        self.connect_called = False
-        def connect_hook():
-            # mock object, break raise/connect loop
-            self.db.conn = Mock({'num_rows': 0})
-            self.connect_called = True
-        self.db._connect = connect_hook
-        # make a query, exception will be raised then connect() will be
-        # called and the second query will use the mock object
-        self.db.query('QUERY')
-        self.assertTrue(self.connect_called)
-
-    def test_query3(self):
-        # OperationalError > raise DatabaseFailure exception
-        from MySQLdb import OperationalError
-        class FakeConn(object):
-            def close(self):
-                pass
-            def query(*args):
-                raise OperationalError(-1, 'this is a test')
-        self.db.conn = FakeConn()
-        self.assertRaises(DatabaseFailure, self.db.query, 'QUERY')
-
-    def test_escape(self):
-        self.assertEquals(self.db.escape('a"b'), 'a\\"b')
-        self.assertEquals(self.db.escape("a'b"), "a\\'b")
-
-    def test_setup(self):
-        # XXX: this test verifies irrelevant symptoms. It should instead check that
-        # - setup, store, setup, load -> data still there
-        # - setup, store, setup(reset=True), load -> data not found
+    def getDB(self):
+        raise NotImplementedError
 
-        # create all tables
-        self.db.conn = Mock()
-        self.db.setup()
-        calls = self.db.conn.mockGetNamedCalls('query')
-        self.assertEquals(len(calls), 7)
-        # create all tables but drop them first
-        self.db.conn = Mock()
-        self.db.setup(reset=True)
-        calls = self.db.conn.mockGetNamedCalls('query')
-        self.assertEquals(len(calls), 8)
+    def closeDB(self):
+        pass
 
     def test_configuration(self):
         # check if a configuration entry is well written

Modified: trunk/neo/tests/storage/testStorageMySQLdb.py
==============================================================================
--- trunk/neo/tests/storage/testStorageMySQLdb.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageMySQLdb.py [iso-8859-1] Tue Nov  9 20:41:21 2010
@@ -18,29 +18,26 @@
 import unittest
 import MySQLdb
 from mock import Mock
-from neo.util import dump, p64, u64
-from neo.protocol import CellStates, ZERO_OID, ZERO_TID, MAX_TID
-from neo.tests import NeoUnitTestBase
 from neo.exception import DatabaseFailure
+from neo.tests.storage.testStorageDBTests import StorageDBTests
 from neo.storage.database.mysqldb import MySQLDatabaseManager
 
 NEO_SQL_DATABASE = 'test_mysqldb0'
 NEO_SQL_USER = 'test'
 
-class StorageMySQSLdbTests(NeoUnitTestBase):
+class StorageMySQSLdbTests(StorageDBTests):
 
-    def setUp(self):
-        NeoUnitTestBase.setUp(self)
+    def getDB(self):
         self.prepareDatabase(number=1, prefix=NEO_SQL_DATABASE[:-1])
         # db manager
         database = '%s@%s' % (NEO_SQL_USER, NEO_SQL_DATABASE)
-        self.db = MySQLDatabaseManager(database)
-        self.db.setup()
-        self.db.setNumPartitions(1)
+        db = MySQLDatabaseManager(database)
+        db.setup()
+        db.setNumPartitions(1)
+        return db
 
-    def tearDown(self):
+    def closeDB(self):
         self.db.close()
-        NeoUnitTestBase.tearDown(self)
 
     def checkCalledQuery(self, query=None, call=0):
         self.assertTrue(len(self.db.conn.mockGetNamedCalls('query')) > call)
@@ -136,6 +133,7 @@ class StorageMySQSLdbTests(NeoUnitTestBa
         # XXX: this test verifies irrelevant symptoms. It should instead check that
         # - setup, store, setup, load -> data still there
         # - setup, store, setup(reset=True), load -> data not found
+        # Then, it should be moved to generic test class.
 
         # create all tables
         self.db.conn = Mock()
@@ -148,667 +146,7 @@ class StorageMySQSLdbTests(NeoUnitTestBa
         calls = self.db.conn.mockGetNamedCalls('query')
         self.assertEquals(len(calls), 8)
 
-    def test_configuration(self):
-        # check if a configuration entry is well written
-        self.db.setConfiguration('a', 'c')
-        result = self.db.getConfiguration('a')
-        self.assertEquals(result, 'c')
-
-    def checkConfigEntry(self, get_call, set_call, value):
-        # generic test for all configuration entries accessors
-        self.assertRaises(KeyError, get_call)
-        set_call(value)
-        self.assertEquals(get_call(), value)
-        set_call(value * 2)
-        self.assertEquals(get_call(), value * 2)
-
-    def test_UUID(self):
-        self.checkConfigEntry(self.db.getUUID, self.db.setUUID, 'TEST_VALUE')
-
-    def test_NumPartitions(self):
-        self.db.setup(reset=True)
-        self.checkConfigEntry(self.db.getNumPartitions,
-                self.db.setNumPartitions, 10)
-
-    def test_Name(self):
-        self.checkConfigEntry(self.db.getName, self.db.setName, 'TEST_NAME')
-
-    def test_15_PTID(self):
-        self.checkConfigEntry(
-            get_call=self.db.getPTID,
-            set_call=self.db.setPTID,
-            value=self.getPTID(1))
-
-    def test_getPartitionTable(self):
-        ptid = self.getPTID(1)
-        uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
-        cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
-        cell2 = (1, uuid1, CellStates.UP_TO_DATE)
-        self.db.setPartitionTable(ptid, [cell1, cell2])
-        result = self.db.getPartitionTable()
-        self.assertEqual(set(result), set([cell1, cell2]))
-
-    def test_getLastOID(self):
-        oid1 = self.getOID(1)
-        self.db.setLastOID(oid1)
-        result1 = self.db.getLastOID()
-        self.assertEquals(result1, oid1)
-
-    def getOIDs(self, count):
-        return [self.getOID(i) for i in xrange(count)]
-
-    def getTIDs(self, count):
-        tid_list = [self.getNextTID()]
-        while len(tid_list) != count:
-            tid_list.append(self.getNextTID(tid_list[-1]))
-        return tid_list
-
-    def getTransaction(self, oid_list):
-        transaction = (oid_list, 'user', 'desc', 'ext', False)
-        object_list = [(oid, 1, 0, '', None) for oid in oid_list]
-        return (transaction, object_list)
-
-    def checkSet(self, list1, list2):
-        self.assertEqual(set(list1), set(list2))
-
-    def test_getLastTID(self):
-        tid1, tid2, tid3, tid4 = self.getTIDs(4)
-        oid1, oid2 = self.getOIDs(2)
-        txn, objs = self.getTransaction([oid1, oid2])
-        # max TID is in obj table
-        self.db.storeTransaction(tid1, objs, txn, False)
-        self.db.storeTransaction(tid2, objs, txn, False)
-        self.assertEqual(self.db.getLastTID(), tid2)
-        # max tid is in ttrans table
-        self.db.storeTransaction(tid3, objs, txn)
-        result = self.db.getLastTID()
-        self.assertEqual(self.db.getLastTID(), tid3)
-        # max tid is in tobj (serial)
-        self.db.storeTransaction(tid4, objs, None)
-        self.assertEqual(self.db.getLastTID(), tid4)
-
-    def test_getUnfinishedTIDList(self):
-        tid1, tid2, tid3, tid4 = self.getTIDs(4)
-        oid1, oid2 = self.getOIDs(2)
-        txn, objs = self.getTransaction([oid1, oid2])
-        # nothing pending
-        self.db.storeTransaction(tid1, objs, txn, False)
-        self.checkSet(self.db.getUnfinishedTIDList(), [])
-        # one unfinished txn
-        self.db.storeTransaction(tid2, objs, txn)
-        self.checkSet(self.db.getUnfinishedTIDList(), [tid2])
-        # no changes
-        self.db.storeTransaction(tid3, objs, None, False)
-        self.checkSet(self.db.getUnfinishedTIDList(), [tid2])
-        # a second txn known by objs only
-        self.db.storeTransaction(tid4, objs, None)
-        self.checkSet(self.db.getUnfinishedTIDList(), [tid2, tid4])
-
-    def test_objectPresent(self):
-        tid = self.getNextTID()
-        oid = self.getOID(1)
-        txn, objs = self.getTransaction([oid])
-        # not present
-        self.assertFalse(self.db.objectPresent(oid, tid, all=True))
-        self.assertFalse(self.db.objectPresent(oid, tid, all=False))
-        # available in temp table
-        self.db.storeTransaction(tid, objs, txn)
-        self.assertTrue(self.db.objectPresent(oid, tid, all=True))
-        self.assertFalse(self.db.objectPresent(oid, tid, all=False))
-        # available in both tables
-        self.db.finishTransaction(tid)
-        self.assertTrue(self.db.objectPresent(oid, tid, all=True))
-        self.assertTrue(self.db.objectPresent(oid, tid, all=False))
-
-    def test_getObject(self):
-        oid1, = self.getOIDs(1)
-        tid1, tid2 = self.getTIDs(2)
-        FOUND_BUT_NOT_VISIBLE = False
-        OBJECT_T1_NO_NEXT = (tid1, None, 1, 0, '', None)
-        OBJECT_T1_NEXT = (tid1, tid2, 1, 0, '', None)
-        OBJECT_T2 = (tid2, None, 1, 0, '', None)
-        txn1, objs1 = self.getTransaction([oid1])
-        txn2, objs2 = self.getTransaction([oid1])
-        # non-present
-        self.assertEqual(self.db.getObject(oid1), None)
-        self.assertEqual(self.db.getObject(oid1, tid1), None)
-        self.assertEqual(self.db.getObject(oid1, before_tid=tid1), None)
-        # one non-commited version
-        self.db.storeTransaction(tid1, objs1, txn1)
-        self.assertEqual(self.db.getObject(oid1), None)
-        self.assertEqual(self.db.getObject(oid1, tid1), None)
-        self.assertEqual(self.db.getObject(oid1, before_tid=tid1), None)
-        # one commited version
-        self.db.finishTransaction(tid1)
-        self.assertEqual(self.db.getObject(oid1), OBJECT_T1_NO_NEXT)
-        self.assertEqual(self.db.getObject(oid1, tid1), OBJECT_T1_NO_NEXT)
-        self.assertEqual(self.db.getObject(oid1, before_tid=tid1),
-            FOUND_BUT_NOT_VISIBLE)
-        # two version available, one non-commited
-        self.db.storeTransaction(tid2, objs2, txn2)
-        self.assertEqual(self.db.getObject(oid1), OBJECT_T1_NO_NEXT)
-        self.assertEqual(self.db.getObject(oid1, tid1), OBJECT_T1_NO_NEXT)
-        self.assertEqual(self.db.getObject(oid1, before_tid=tid1),
-            FOUND_BUT_NOT_VISIBLE)
-        self.assertEqual(self.db.getObject(oid1, tid2), FOUND_BUT_NOT_VISIBLE)
-        self.assertEqual(self.db.getObject(oid1, before_tid=tid2),
-            OBJECT_T1_NO_NEXT)
-        # two commited versions
-        self.db.finishTransaction(tid2)
-        self.assertEqual(self.db.getObject(oid1), OBJECT_T2)
-        self.assertEqual(self.db.getObject(oid1, tid1), OBJECT_T1_NO_NEXT)
-        self.assertEqual(self.db.getObject(oid1, before_tid=tid1),
-            FOUND_BUT_NOT_VISIBLE)
-        self.assertEqual(self.db.getObject(oid1, tid2), OBJECT_T2)
-        self.assertEqual(self.db.getObject(oid1, before_tid=tid2),
-            OBJECT_T1_NEXT)
-
-    def test_setPartitionTable(self):
-        ptid = self.getPTID(1)
-        uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
-        cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
-        cell2 = (1, uuid1, CellStates.UP_TO_DATE)
-        cell3 = (1, uuid1, CellStates.DISCARDED)
-        # no partition table
-        self.assertEqual(self.db.getPartitionTable(), [])
-        # set one
-        self.db.setPartitionTable(ptid, [cell1])
-        result = self.db.getPartitionTable()
-        self.assertEqual(result, [cell1])
-        # then another
-        self.db.setPartitionTable(ptid, [cell2])
-        result = self.db.getPartitionTable()
-        self.assertEqual(result, [cell2])
-        # drop discarded cells
-        self.db.setPartitionTable(ptid, [cell2, cell3])
-        result = self.db.getPartitionTable()
-        self.assertEqual(result, [])
-
-    def test_changePartitionTable(self):
-        ptid = self.getPTID(1)
-        uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
-        cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
-        cell2 = (1, uuid1, CellStates.UP_TO_DATE)
-        cell3 = (1, uuid1, CellStates.DISCARDED)
-        # no partition table
-        self.assertEqual(self.db.getPartitionTable(), [])
-        # set one
-        self.db.changePartitionTable(ptid, [cell1])
-        result = self.db.getPartitionTable()
-        self.assertEqual(result, [cell1])
-        # add more entries
-        self.db.changePartitionTable(ptid, [cell2])
-        result = self.db.getPartitionTable()
-        self.assertEqual(set(result), set([cell1, cell2]))
-        # drop discarded cells
-        self.db.changePartitionTable(ptid, [cell2, cell3])
-        result = self.db.getPartitionTable()
-        self.assertEqual(result, [cell1])
-
-    def test_dropUnfinishedData(self):
-        oid1, oid2 = self.getOIDs(2)
-        tid1, tid2 = self.getTIDs(2)
-        txn1, objs1 = self.getTransaction([oid1])
-        txn2, objs2 = self.getTransaction([oid1])
-        # nothing
-        self.assertEqual(self.db.getObject(oid1), None)
-        self.assertEqual(self.db.getObject(oid2), None)
-        self.assertEqual(self.db.getUnfinishedTIDList(), [])
-        # one is still pending
-        self.db.storeTransaction(tid1, objs1, txn1)
-        self.db.storeTransaction(tid2, objs2, txn2)
-        self.db.finishTransaction(tid1)
-        result = self.db.getObject(oid1)
-        self.assertEqual(result, (tid1, None, 1, 0, '', None))
-        self.assertEqual(self.db.getObject(oid2), None)
-        self.assertEqual(self.db.getUnfinishedTIDList(), [tid2])
-        # drop it
-        self.db.dropUnfinishedData()
-        self.assertEqual(self.db.getUnfinishedTIDList(), [])
-        result = self.db.getObject(oid1)
-        self.assertEqual(result, (tid1, None, 1, 0, '', None))
-        self.assertEqual(self.db.getObject(oid2), None)
-
-    def test_storeTransaction(self):
-        oid1, oid2 = self.getOIDs(2)
-        tid1, tid2 = self.getTIDs(2)
-        txn1, objs1 = self.getTransaction([oid1])
-        txn2, objs2 = self.getTransaction([oid2])
-        # nothing in database
-        self.assertEqual(self.db.getLastTID(), None)
-        self.assertEqual(self.db.getUnfinishedTIDList(), [])
-        self.assertEqual(self.db.getObject(oid1), None)
-        self.assertEqual(self.db.getObject(oid2), None)
-        self.assertEqual(self.db.getTransaction(tid1, True), None)
-        self.assertEqual(self.db.getTransaction(tid2, True), None)
-        self.assertEqual(self.db.getTransaction(tid1, False), None)
-        self.assertEqual(self.db.getTransaction(tid2, False), None)
-        # store in temporary tables
-        self.db.storeTransaction(tid1, objs1, txn1)
-        self.db.storeTransaction(tid2, objs2, txn2)
-        result = self.db.getTransaction(tid1, True)
-        self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
-        result = self.db.getTransaction(tid2, True)
-        self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
-        self.assertEqual(self.db.getTransaction(tid1, False), None)
-        self.assertEqual(self.db.getTransaction(tid2, False), None)
-        # commit pending transaction
-        self.db.finishTransaction(tid1)
-        self.db.finishTransaction(tid2)
-        result = self.db.getTransaction(tid1, True)
-        self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
-        result = self.db.getTransaction(tid2, True)
-        self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
-        result = self.db.getTransaction(tid1, False)
-        self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
-        result = self.db.getTransaction(tid2, False)
-        self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
-
-    def test_askFinishTransaction(self):
-        oid1, oid2 = self.getOIDs(2)
-        tid1, tid2 = self.getTIDs(2)
-        txn1, objs1 = self.getTransaction([oid1])
-        txn2, objs2 = self.getTransaction([oid2])
-        # stored but not finished
-        self.db.storeTransaction(tid1, objs1, txn1)
-        self.db.storeTransaction(tid2, objs2, txn2)
-        result = self.db.getTransaction(tid1, True)
-        self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
-        result = self.db.getTransaction(tid2, True)
-        self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
-        self.assertEqual(self.db.getTransaction(tid1, False), None)
-        self.assertEqual(self.db.getTransaction(tid2, False), None)
-        # stored and finished
-        self.db.finishTransaction(tid1)
-        self.db.finishTransaction(tid2)
-        result = self.db.getTransaction(tid1, True)
-        self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
-        result = self.db.getTransaction(tid2, True)
-        self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
-        result = self.db.getTransaction(tid1, False)
-        self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
-        result = self.db.getTransaction(tid2, False)
-        self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
-
-    def test_deleteTransaction(self):
-        oid1, oid2 = self.getOIDs(2)
-        tid1, tid2 = self.getTIDs(2)
-        txn1, objs1 = self.getTransaction([oid1])
-        txn2, objs2 = self.getTransaction([oid2])
-        self.db.storeTransaction(tid1, objs1, txn1)
-        self.db.storeTransaction(tid2, objs2, txn2)
-        self.db.finishTransaction(tid1)
-        self.db.deleteTransaction(tid1, [oid1])
-        self.db.deleteTransaction(tid2, [oid2])
-        self.assertEqual(self.db.getTransaction(tid1, True), None)
-        self.assertEqual(self.db.getTransaction(tid2, True), None)
-
-    def test_deleteObject(self):
-        oid1, oid2 = self.getOIDs(2)
-        tid1, tid2 = self.getTIDs(2)
-        txn1, objs1 = self.getTransaction([oid1, oid2])
-        txn2, objs2 = self.getTransaction([oid1, oid2])
-        self.db.storeTransaction(tid1, objs1, txn1)
-        self.db.storeTransaction(tid2, objs2, txn2)
-        self.db.finishTransaction(tid1)
-        self.db.finishTransaction(tid2)
-        self.db.deleteObject(oid1)
-        self.assertEqual(self.db.getObject(oid1, tid=tid1), None)
-        self.assertEqual(self.db.getObject(oid1, tid=tid2), None)
-        self.db.deleteObject(oid2, serial=tid1)
-        self.assertEqual(self.db.getObject(oid2, tid=tid1), False)
-        self.assertEqual(self.db.getObject(oid2, tid=tid2), (tid2, None) + \
-            objs2[1][1:])
-
-    def test_getTransaction(self):
-        oid1, oid2 = self.getOIDs(2)
-        tid1, tid2 = self.getTIDs(2)
-        txn1, objs1 = self.getTransaction([oid1])
-        txn2, objs2 = self.getTransaction([oid2])
-        # get from temporary table or not
-        self.db.storeTransaction(tid1, objs1, txn1)
-        self.db.storeTransaction(tid2, objs2, txn2)
-        self.db.finishTransaction(tid1)
-        result = self.db.getTransaction(tid1, True)
-        self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
-        result = self.db.getTransaction(tid2, True)
-        self.assertEqual(result, ([oid2], 'user', 'desc', 'ext', False))
-        # get from non-temporary only
-        result = self.db.getTransaction(tid1, False)
-        self.assertEqual(result, ([oid1], 'user', 'desc', 'ext', False))
-        self.assertEqual(self.db.getTransaction(tid2, False), None)
-
-    def test_getObjectHistory(self):
-        oid = self.getOID(1)
-        tid1, tid2, tid3 = self.getTIDs(3)
-        txn1, objs1 = self.getTransaction([oid])
-        txn2, objs2 = self.getTransaction([oid])
-        txn3, objs3 = self.getTransaction([oid])
-        # one revision
-        self.db.storeTransaction(tid1, objs1, txn1)
-        self.db.finishTransaction(tid1)
-        result = self.db.getObjectHistory(oid, 0, 3)
-        self.assertEqual(result, [(tid1, 0)])
-        result = self.db.getObjectHistory(oid, 1, 1)
-        self.assertEqual(result, None)
-        # two revisions
-        self.db.storeTransaction(tid2, objs2, txn2)
-        self.db.finishTransaction(tid2)
-        result = self.db.getObjectHistory(oid, 0, 3)
-        self.assertEqual(result, [(tid2, 0), (tid1, 0)])
-        result = self.db.getObjectHistory(oid, 1, 3)
-        self.assertEqual(result, [(tid1, 0)])
-        result = self.db.getObjectHistory(oid, 2, 3)
-        self.assertEqual(result, None)
-
-    def test_getObjectHistoryFrom(self):
-        self.db.setup()
-        self.db.setNumPartitions(2)
-        oid1 = self.getOID(0)
-        oid2 = self.getOID(2)
-        oid3 = self.getOID(1)
-        tid1, tid2, tid3, tid4, tid5 = self.getTIDs(5)
-        txn1, objs1 = self.getTransaction([oid1])
-        txn2, objs2 = self.getTransaction([oid2])
-        txn3, objs3 = self.getTransaction([oid1])
-        txn4, objs4 = self.getTransaction([oid2])
-        txn5, objs5 = self.getTransaction([oid3])
-        self.db.storeTransaction(tid1, objs1, txn1)
-        self.db.storeTransaction(tid2, objs2, txn2)    
-        self.db.storeTransaction(tid3, objs3, txn3)
-        self.db.storeTransaction(tid4, objs4, txn4)
-        self.db.storeTransaction(tid5, objs5, txn5)
-        self.db.finishTransaction(tid1)
-        self.db.finishTransaction(tid2)
-        self.db.finishTransaction(tid3)
-        self.db.finishTransaction(tid4)
-        self.db.finishTransaction(tid5)
-        # Check full result
-        result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, MAX_TID, 10,
-            2, 0)
-        self.assertEqual(result, {
-            oid1: [tid1, tid3],
-            oid2: [tid2, tid4],
-        })
-        # Lower bound is inclusive
-        result = self.db.getObjectHistoryFrom(oid1, tid1, MAX_TID, 10, 2, 0)
-        self.assertEqual(result, {
-            oid1: [tid1, tid3],
-            oid2: [tid2, tid4],
-        })
-        # Upper bound is inclusive
-        result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, tid3, 10,
-            2, 0)
-        self.assertEqual(result, {
-            oid1: [tid1, tid3],
-            oid2: [tid2],
-        })
-        # Length is total number of serials
-        result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, MAX_TID, 3,
-            2, 0)
-        self.assertEqual(result, {
-            oid1: [tid1, tid3],
-            oid2: [tid2],
-        })
-        # Partition constraints are honored
-        result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, MAX_TID, 10,
-            2, 1)
-        self.assertEqual(result, {
-            oid3: [tid5],
-        })
-
-    def _storeTransactions(self, count):
-        # use OID generator to know result of tid % N
-        tid_list = self.getOIDs(count)
-        oid = self.getOID(1)
-        for tid in tid_list:
-            txn, objs = self.getTransaction([oid])
-            self.db.storeTransaction(tid, objs, txn)
-            self.db.finishTransaction(tid)
-        return tid_list
-
-    def test_getTIDList(self):
-        self.db.setup(True)
-        self.db.setNumPartitions(2)
-        tid1, tid2, tid3, tid4 = self._storeTransactions(4)
-        # get tids
-        # - all partitions
-        result = self.db.getTIDList(0, 4, 2, [0, 1])
-        self.checkSet(result, [tid1, tid2, tid3, tid4])
-        # - one partition
-        result = self.db.getTIDList(0, 4, 2, [0])
-        self.checkSet(result, [tid1, tid3])
-        result = self.db.getTIDList(0, 4, 2, [1])
-        self.checkSet(result, [tid2, tid4])
-        # get a subset of tids
-        result = self.db.getTIDList(0, 1, 2, [0])
-        self.checkSet(result, [tid3]) # desc order
-        result = self.db.getTIDList(1, 1, 2, [1])
-        self.checkSet(result, [tid2]) 
-        result = self.db.getTIDList(2, 2, 2, [0])
-        self.checkSet(result, [])
-
-    def test_getReplicationTIDList(self):
-        self.db.setup(True)
-        self.db.setNumPartitions(2)
-        tid1, tid2, tid3, tid4 = self._storeTransactions(4)
-        # get tids
-        # - all
-        result = self.db.getReplicationTIDList(ZERO_TID, MAX_TID, 10, 2, 0)
-        self.checkSet(result, [tid1, tid3])
-        # - one partition
-        result = self.db.getReplicationTIDList(ZERO_TID, MAX_TID, 10, 2, 0)
-        self.checkSet(result, [tid1, tid3])
-        # - another partition
-        result = self.db.getReplicationTIDList(ZERO_TID, MAX_TID, 10, 2, 1)
-        self.checkSet(result, [tid2, tid4])
-        # - min_tid is inclusive
-        result = self.db.getReplicationTIDList(tid3, MAX_TID, 10, 2, 0)
-        self.checkSet(result, [tid3])
-        # - max tid is inclusive
-        result = self.db.getReplicationTIDList(ZERO_TID, tid2, 10, 2, 0)
-        self.checkSet(result, [tid1])
-        # - limit
-        result = self.db.getReplicationTIDList(ZERO_TID, MAX_TID, 1, 2, 0)
-        self.checkSet(result, [tid1])
-
-    def test__getObjectData(self):
-        db = self.db
-        db.setup(reset=True)
-        self.db.setNumPartitions(4)
-        tid0 = self.getNextTID()
-        tid1 = self.getNextTID()
-        tid2 = self.getNextTID()
-        tid3 = self.getNextTID()
-        assert tid0 < tid1 < tid2 < tid3
-        oid1 = self.getOID(1)
-        oid2 = self.getOID(2)
-        oid3 = self.getOID(3)
-        db.storeTransaction(
-            tid1, (
-                (oid1, 0, 0, 'foo', None),
-                (oid2, None, None, None, tid0),
-                (oid3, None, None, None, tid2),
-            ), None, temporary=False)
-        db.storeTransaction(
-            tid2, (
-                (oid1, None, None, None, tid1),
-                (oid2, None, None, None, tid1),
-                (oid3, 0, 0, 'bar', None),
-            ), None, temporary=False)
-
-        original_getObjectData = db._getObjectData
-        def _getObjectData(*args, **kw):
-            call_counter.append(1)
-            return original_getObjectData(*args, **kw)
-        db._getObjectData = _getObjectData
-
-        # NOTE: all tests are done as if values were fetched by _getObject, so
-        # there is already one indirection level.
-
-        # oid1 at tid1: data is immediately found
-        call_counter = []
-        self.assertEqual(
-            db._getObjectData(u64(oid1), u64(tid1), u64(tid3)),
-            (u64(tid1), 0, 0, 'foo'))
-        self.assertEqual(sum(call_counter), 1)
-
-        # oid2 at tid1: missing data in table, raise IndexError on next
-        # recursive call
-        call_counter = []
-        self.assertRaises(IndexError, db._getObjectData, u64(oid2), u64(tid1),
-            u64(tid3))
-        self.assertEqual(sum(call_counter), 2)
-
-        # oid3 at tid1: data_serial grater than row's tid, raise ValueError
-        # on next recursive call - even if data does exist at that tid (see
-        # "oid3 at tid2" case below)
-        call_counter = []
-        self.assertRaises(ValueError, db._getObjectData, u64(oid3), u64(tid1),
-            u64(tid3))
-        self.assertEqual(sum(call_counter), 2)
-        # Same with wrong parameters (tid0 < tid1)
-        call_counter = []
-        self.assertRaises(ValueError, db._getObjectData, u64(oid3), u64(tid1),
-            u64(tid0))
-        self.assertEqual(sum(call_counter), 1)
-        # Same with wrong parameters (tid1 == tid1)
-        call_counter = []
-        self.assertRaises(ValueError, db._getObjectData, u64(oid3), u64(tid1),
-            u64(tid1))
-        self.assertEqual(sum(call_counter), 1)
-
-        # oid1 at tid2: data is found after ons recursive call
-        call_counter = []
-        self.assertEqual(
-            db._getObjectData(u64(oid1), u64(tid2), u64(tid3)),
-            (u64(tid1), 0, 0, 'foo'))
-        self.assertEqual(sum(call_counter), 2)
-
-        # oid2 at tid2: missing data in table, raise IndexError after two
-        # recursive calls
-        call_counter = []
-        self.assertRaises(IndexError, db._getObjectData, u64(oid2), u64(tid2),
-            u64(tid3))
-        self.assertEqual(sum(call_counter), 3)
-
-        # oid3 at tid2: data is immediately found
-        call_counter = []
-        self.assertEqual(
-            db._getObjectData(u64(oid3), u64(tid2), u64(tid3)),
-            (u64(tid2), 0, 0, 'bar'))
-        self.assertEqual(sum(call_counter), 1)
-
-    def test__getDataTIDFromData(self):
-        db = self.db
-        db.setup(reset=True)
-        self.db.setNumPartitions(4)
-        tid1 = self.getNextTID()
-        tid2 = self.getNextTID()
-        oid1 = self.getOID(1)
-        db.storeTransaction(
-            tid1, (
-                (oid1, 0, 0, 'foo', None),
-            ), None, temporary=False)
-        db.storeTransaction(
-            tid2, (
-                (oid1, None, None, None, tid1),
-            ), None, temporary=False)
-
-        self.assertEqual(
-            db._getDataTIDFromData(u64(oid1),
-                db._getObject(u64(oid1), tid=u64(tid1))),
-            (u64(tid1), u64(tid1)))
-        self.assertEqual(
-            db._getDataTIDFromData(u64(oid1),
-                db._getObject(u64(oid1), tid=u64(tid2))),
-            (u64(tid2), u64(tid1)))
-
-    def test__getDataTID(self):
-        db = self.db
-        db.setup(reset=True)
-        self.db.setNumPartitions(4)
-        tid1 = self.getNextTID()
-        tid2 = self.getNextTID()
-        oid1 = self.getOID(1)
-        db.storeTransaction(
-            tid1, (
-                (oid1, 0, 0, 'foo', None),
-            ), None, temporary=False)
-        db.storeTransaction(
-            tid2, (
-                (oid1, None, None, None, tid1),
-            ), None, temporary=False)
-
-        self.assertEqual(
-            db._getDataTID(u64(oid1), tid=u64(tid1)),
-            (u64(tid1), u64(tid1)))
-        self.assertEqual(
-            db._getDataTID(u64(oid1), tid=u64(tid2)),
-            (u64(tid2), u64(tid1)))
-
-    def test_findUndoTID(self):
-        db = self.db
-        db.setup(reset=True)
-        self.db.setNumPartitions(4)
-        tid1 = self.getNextTID()
-        tid2 = self.getNextTID()
-        tid3 = self.getNextTID()
-        tid4 = self.getNextTID()
-        oid1 = self.getOID(1)
-        db.storeTransaction(
-            tid1, (
-                (oid1, 0, 0, 'foo', None),
-            ), None, temporary=False)
-
-        # Undoing oid1 tid1, OK: tid1 is latest
-        # Result: current tid is tid1, data_tid is None (undoing object
-        # creation)
-        self.assertEqual(
-            db.findUndoTID(oid1, tid4, tid1, None),
-            (tid1, None, True))
-
-        # Store a new transaction
-        db.storeTransaction(
-            tid2, (
-                (oid1, 0, 0, 'bar', None),
-            ), None, temporary=False)
-
-        # Undoing oid1 tid2, OK: tid2 is latest
-        # Result: current tid is tid2, data_tid is tid1
-        self.assertEqual(
-            db.findUndoTID(oid1, tid4, tid2, None),
-            (tid2, tid1, True))
-
-        # Undoing oid1 tid1, Error: tid2 is latest
-        # Result: current tid is tid2, data_tid is -1
-        self.assertEqual(
-            db.findUndoTID(oid1, tid4, tid1, None),
-            (tid2, None, False))
-
-        # Undoing oid1 tid1 with tid2 being undone in same transaction,
-        # OK: tid1 is latest
-        # Result: current tid is tid2, data_tid is None (undoing object
-        # creation)
-        # Explanation of transaction_object: oid1, no data but a data serial
-        # to tid1
-        self.assertEqual(
-            db.findUndoTID(oid1, tid4, tid1,
-                (u64(oid1), None, None, None, tid1)),
-            (tid4, None, True))
-
-        # Store a new transaction
-        db.storeTransaction(
-            tid3, (
-                (oid1, None, None, None, tid1),
-            ), None, temporary=False)
-
-        # Undoing oid1 tid1, OK: tid3 is latest with tid1 data
-        # Result: current tid is tid2, data_tid is None (undoing object
-        # creation)
-        self.assertEqual(
-            db.findUndoTID(oid1, tid4, tid1, None),
-            (tid3, None, True))
+del StorageDBTests
 
 if __name__ == "__main__":
     unittest.main()





More information about the Neo-report mailing list