[Neo-report] r2296 vincent - in /trunk/neo: ./ storage/ storage/database/ storage/handlers...
nobody at svn.erp5.org
nobody at svn.erp5.org
Sun Sep 5 11:50:47 CEST 2010
Author: vincent
Date: Sun Sep 5 11:50:46 2010
New Revision: 2296
Log:
Use partition's critical TID to avoid unneeded replications.
Modified:
trunk/neo/handler.py
trunk/neo/protocol.py
trunk/neo/storage/database/manager.py
trunk/neo/storage/database/mysqldb.py
trunk/neo/storage/handlers/replication.py
trunk/neo/storage/handlers/storage.py
trunk/neo/storage/replicator.py
trunk/neo/tests/storage/testReplicationHandler.py
trunk/neo/tests/storage/testStorageHandler.py
trunk/neo/tests/storage/testStorageMySQLdb.py
trunk/neo/tests/testProtocol.py
Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Sun Sep 5 11:50:46 2010
@@ -256,7 +256,7 @@ class EventHandler(object):
def answerTIDs(self, conn, tid_list):
raise UnexpectedPacketError
- def askTIDsFrom(self, conn, min_tid, length, partition):
+ def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
raise UnexpectedPacketError
def answerTIDsFrom(self, conn, tid_list):
@@ -275,7 +275,8 @@ class EventHandler(object):
def answerObjectHistory(self, conn, oid, history_list):
raise UnexpectedPacketError
- def askObjectHistoryFrom(self, conn, oid, min_serial, length, partition):
+ def askObjectHistoryFrom(self, conn, oid, min_serial, max_serial, length,
+ partition):
raise UnexpectedPacketError
def answerObjectHistoryFrom(self, conn, object_dict):
Modified: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py [iso-8859-1] Sun Sep 5 11:50:46 2010
@@ -114,6 +114,7 @@ ZERO_TID = '\0' * 8
ZERO_OID = '\0' * 8
OID_LEN = len(INVALID_OID)
TID_LEN = len(INVALID_TID)
+MAX_TID = '\xff' * 8
UUID_NAMESPACES = {
NodeTypes.STORAGE: 'S',
@@ -1067,10 +1068,10 @@ class AskTIDsFrom(Packet):
Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
S -> S.
"""
- _header_format = '!8sLL'
+ _header_format = '!8s8sLL'
- def _encode(self, min_tid, length, partition):
- return pack(self._header_format, min_tid, length, partition)
+ def _encode(self, min_tid, max_tid, length, partition):
+ return pack(self._header_format, min_tid, max_tid, length, partition)
def _decode(self, body):
return unpack(self._header_format, body) # min_tid, length, partition
@@ -1170,11 +1171,11 @@ class AskObjectHistoryFrom(Packet):
Ask history information for a given object. The order of serials is
ascending, and starts at (or above) min_serial for min_oid. S -> S.
"""
- _header_format = '!8s8sLL'
+ _header_format = '!8s8s8sLL'
- def _encode(self, min_oid, min_serial, length, partition):
- return pack(self._header_format, min_oid, min_serial, length,
- partition)
+ def _encode(self, min_oid, min_serial, max_serial, length, partition):
+ return pack(self._header_format, min_oid, min_serial, max_serial,
+ length, partition)
def _decode(self, body):
# min_oid, min_serial, length, partition
Modified: trunk/neo/storage/database/manager.py
==============================================================================
--- trunk/neo/storage/database/manager.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/manager.py [iso-8859-1] Sun Sep 5 11:50:46 2010
@@ -294,11 +294,11 @@ class DatabaseManager(object):
If there is no such object ID in a database, return None."""
raise NotImplementedError
- def getObjectHistoryFrom(self, oid, min_serial, length, num_partitions,
- partition):
+ def getObjectHistoryFrom(self, oid, min_serial, max_serial, length,
+ num_partitions, partition):
"""Return a dict of length serials grouped by oid at (or above)
- min_oid and min_serial, for given partition, sorted in ascending
- order."""
+ min_oid and min_serial and below max_serial, for given partition,
+ sorted in ascending order."""
raise NotImplementedError
def getTIDList(self, offset, length, num_partitions, partition_list):
@@ -307,11 +307,11 @@ class DatabaseManager(object):
to filter out non-applicable TIDs."""
raise NotImplementedError
- def getReplicationTIDList(self, min_tid, length, num_partitions,
+ def getReplicationTIDList(self, min_tid, max_tid, length, num_partitions,
partition):
"""Return a list of TIDs in ascending order from an initial tid value,
- at most the specified length. The partition number is passed to filter
- out non-applicable TIDs."""
+ at most the specified length up to max_tid. The partition number is
+ passed to filter out non-applicable TIDs."""
raise NotImplementedError
def pack(self, tid, updateObjectDataForPack):
Modified: trunk/neo/storage/database/mysqldb.py
==============================================================================
--- trunk/neo/storage/database/mysqldb.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/mysqldb.py [iso-8859-1] Sun Sep 5 11:50:46 2010
@@ -649,20 +649,23 @@ class MySQLDatabaseManager(DatabaseManag
return result
return None
- def getObjectHistoryFrom(self, min_oid, min_serial, length, num_partitions,
- partition):
+ def getObjectHistoryFrom(self, min_oid, min_serial, max_serial, length,
+ num_partitions, partition):
q = self.query
u64 = util.u64
p64 = util.p64
min_oid = u64(min_oid)
min_serial = u64(min_serial)
+ max_serial = u64(max_serial)
r = q('SELECT oid, serial FROM obj '
'WHERE ((oid = %(min_oid)d AND serial >= %(min_serial)d) OR '
'oid > %(min_oid)d) AND '
- 'MOD(oid, %(num_partitions)d) = %(partition)s '
+ 'MOD(oid, %(num_partitions)d) = %(partition)s AND '
+ 'serial <= %(max_serial)d '
'ORDER BY oid ASC, serial ASC LIMIT %(length)d' % {
'min_oid': min_oid,
'min_serial': min_serial,
+ 'max_serial': max_serial,
'length': length,
'num_partitions': num_partitions,
'partition': partition,
@@ -685,19 +688,24 @@ class MySQLDatabaseManager(DatabaseManag
offset, length))
return [util.p64(t[0]) for t in r]
- def getReplicationTIDList(self, min_tid, length, num_partitions,
+ def getReplicationTIDList(self, min_tid, max_tid, length, num_partitions,
partition):
q = self.query
+ u64 = util.u64
+ p64 = util.p64
+ min_tid = u64(min_tid)
+ max_tid = u64(max_tid)
r = q("""SELECT tid FROM trans WHERE
MOD(tid, %(num_partitions)d) = %(partition)d
- AND tid >= %(min_tid)d
+ AND tid >= %(min_tid)d AND tid <= %(max_tid)d
ORDER BY tid ASC LIMIT %(length)d""" % {
'num_partitions': num_partitions,
'partition': partition,
- 'min_tid': util.u64(min_tid),
+ 'min_tid': min_tid,
+ 'max_tid': max_tid,
'length': length,
})
- return [util.p64(t[0]) for t in r]
+ return [p64(t[0]) for t in r]
def _updatePackFuture(self, oid, orig_serial, max_serial,
updateObjectDataForPack):
Modified: trunk/neo/storage/handlers/replication.py
==============================================================================
--- trunk/neo/storage/handlers/replication.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/replication.py [iso-8859-1] Sun Sep 5 11:50:46 2010
@@ -165,16 +165,21 @@ class ReplicationHandler(EventHandler):
def _doAskTIDsFrom(self, min_tid, length):
replicator = self.app.replicator
- partition = replicator.current_partition.getRID()
- replicator.getTIDsFrom(min_tid, length, partition)
- return Packets.AskTIDsFrom(min_tid, length, partition)
+ partition = replicator.current_partition
+ partition_id = partition.getRID()
+ max_tid = partition.getCriticalTID()
+ replicator.getTIDsFrom(min_tid, max_tid, length, partition_id)
+ return Packets.AskTIDsFrom(min_tid, max_tid, length, partition_id)
def _doAskObjectHistoryFrom(self, min_oid, min_serial, length):
replicator = self.app.replicator
- partition = replicator.current_partition.getRID()
- replicator.getObjectHistoryFrom(min_oid, min_serial, length, partition)
- return Packets.AskObjectHistoryFrom(min_oid, min_serial, length,
- partition)
+ partition = replicator.current_partition
+ partition_id = partition.getRID()
+ max_serial = partition.getCriticalTID()
+ replicator.getObjectHistoryFrom(min_oid, min_serial, max_serial,
+ length, partition_id)
+ return Packets.AskObjectHistoryFrom(min_oid, min_serial, max_serial,
+ length, partition_id)
@checkConnectionIsReplicatorConnection
def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
@@ -200,7 +205,8 @@ class ReplicationHandler(EventHandler):
p = self._doAskCheckTIDRange(min_tid, min(length / 2,
count + 1))
if p is None:
- if count == length:
+ if count == length and \
+ max_tid < replicator.current_partition.getCriticalTID():
# Go on with next chunk
p = self._doAskCheckTIDRange(add64(max_tid, 1))
else:
Modified: trunk/neo/storage/handlers/storage.py
==============================================================================
--- trunk/neo/storage/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/storage/handlers/storage.py [iso-8859-1] Sun Sep 5 11:50:46 2010
@@ -30,17 +30,17 @@ class StorageOperationHandler(BaseClient
tid = app.dm.getLastTID()
conn.answer(Packets.AnswerLastIDs(oid, tid, app.pt.getID()))
- def askTIDsFrom(self, conn, min_tid, length, partition):
+ def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
app = self.app
- tid_list = app.dm.getReplicationTIDList(min_tid, length,
+ tid_list = app.dm.getReplicationTIDList(min_tid, max_tid, length,
app.pt.getPartitions(), partition)
conn.answer(Packets.AnswerTIDsFrom(tid_list))
- def askObjectHistoryFrom(self, conn, min_oid, min_serial, length,
- partition):
+ def askObjectHistoryFrom(self, conn, min_oid, min_serial, max_serial,
+ length, partition):
app = self.app
- object_dict = app.dm.getObjectHistoryFrom(min_oid, min_serial, length,
- app.pt.getPartitions(), partition)
+ object_dict = app.dm.getObjectHistoryFrom(min_oid, min_serial, max_serial,
+ length, app.pt.getPartitions(), partition)
conn.answer(Packets.AnswerObjectHistoryFrom(object_dict))
def askCheckTIDRange(self, conn, min_tid, length, partition):
Modified: trunk/neo/storage/replicator.py
==============================================================================
--- trunk/neo/storage/replicator.py [iso-8859-1] (original)
+++ trunk/neo/storage/replicator.py [iso-8859-1] Sun Sep 5 11:50:46 2010
@@ -361,17 +361,18 @@ class Replicator(object):
app.dm.checkSerialRange, (min_oid, min_serial, length,
app.pt.getPartitions(), partition))
- def getTIDsFrom(self, min_tid, length, partition):
+ def getTIDsFrom(self, min_tid, max_tid, length, partition):
app = self.app
self._addTask('TIDsFrom',
- app.dm.getReplicationTIDList, (min_tid, length,
+ app.dm.getReplicationTIDList, (min_tid, max_tid, length,
app.pt.getPartitions(), partition))
- def getObjectHistoryFrom(self, min_oid, min_serial, length, partition):
+ def getObjectHistoryFrom(self, min_oid, min_serial, max_serial, length,
+ partition):
app = self.app
self._addTask('ObjectHistoryFrom',
- app.dm.getObjectHistoryFrom, (min_oid, min_serial, length,
- app.pt.getPartitions(), partition))
+ app.dm.getObjectHistoryFrom, (min_oid, min_serial, max_serial,
+ length, app.pt.getPartitions(), partition))
def _getCheckResult(self, key):
return self.task_dict.pop(key).getResult()
Modified: trunk/neo/tests/storage/testReplicationHandler.py
==============================================================================
--- trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testReplicationHandler.py [iso-8859-1] Sun Sep 5 11:50:46 2010
@@ -240,11 +240,13 @@ class StorageReplicationHandlerTests(Neo
def test_answerCheckTIDRangeIdenticalChunkWithNext(self):
min_tid = self.getNextTID()
max_tid = self.getNextTID()
+ critical_tid = self.getNextTID()
+ assert max_tid < critical_tid
length = RANGE_LENGTH / 2
rid = 12
conn = self.getFakeConnection()
app = self.getApp(tid_check_result=(length, 0, max_tid), rid=rid,
- conn=conn)
+ conn=conn, critical_tid=critical_tid)
handler = ReplicationHandler(app)
# Peer has the same data as we have: length, checksum and max_tid
# match.
@@ -259,6 +261,31 @@ class StorageReplicationHandlerTests(Neo
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_tid, plength, ppartition)
+ def test_answerCheckTIDRangeIdenticalChunkAboveCriticalTID(self):
+ critical_tid = self.getNextTID()
+ min_tid = self.getNextTID()
+ max_tid = self.getNextTID()
+ assert critical_tid < max_tid
+ length = RANGE_LENGTH / 2
+ rid = 12
+ conn = self.getFakeConnection()
+ app = self.getApp(tid_check_result=(length, 0, max_tid), rid=rid,
+ conn=conn, critical_tid=critical_tid)
+ handler = ReplicationHandler(app)
+ # Peer has the same data as we have: length, checksum and max_tid
+ # match.
+ handler.answerCheckTIDRange(conn, min_tid, length, length, 0, max_tid)
+ # Result: go on with object range checks
+ pmin_oid, pmin_serial, plength, ppartition = self.checkAskPacket(conn,
+ Packets.AskCheckSerialRange, decode=True)
+ self.assertEqual(pmin_oid, ZERO_OID)
+ self.assertEqual(pmin_serial, ZERO_TID)
+ self.assertEqual(plength, RANGE_LENGTH)
+ self.assertEqual(ppartition, rid)
+ calls = app.replicator.mockGetNamedCalls('checkSerialRange')
+ self.assertEqual(len(calls), 1)
+ calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
+
def test_answerCheckTIDRangeIdenticalChunkWithoutNext(self):
min_tid = self.getNextTID()
max_tid = self.getNextTID()
@@ -307,11 +334,12 @@ class StorageReplicationHandlerTests(Neo
def test_answerCheckTIDRangeDifferentSmallChunkWithNext(self):
min_tid = self.getNextTID()
max_tid = self.getNextTID()
+ critical_tid = self.getNextTID()
length = MIN_RANGE_LENGTH - 1
rid = 12
conn = self.getFakeConnection()
app = self.getApp(tid_check_result=(length - 5, 0, max_tid), rid=rid,
- conn=conn)
+ conn=conn, critical_tid=critical_tid)
handler = ReplicationHandler(app)
# Peer has different data
handler.answerCheckTIDRange(conn, min_tid, length, length, 0, max_tid)
@@ -322,13 +350,14 @@ class StorageReplicationHandlerTests(Neo
tid_packet = tid_call.getParam(0)
next_packet = next_call.getParam(0)
self.assertEqual(tid_packet.getType(), Packets.AskTIDsFrom)
- pmin_tid, plength, ppartition = tid_packet.decode()
+ pmin_tid, pmax_tid, plength, ppartition = tid_packet.decode()
self.assertEqual(pmin_tid, min_tid)
+ self.assertEqual(pmax_tid, critical_tid)
self.assertEqual(plength, length)
self.assertEqual(ppartition, rid)
calls = app.replicator.mockGetNamedCalls('getTIDsFrom')
self.assertEqual(len(calls), 1)
- calls[0].checkArgs(pmin_tid, plength, ppartition)
+ calls[0].checkArgs(pmin_tid, pmax_tid, plength, ppartition)
self.assertEqual(next_packet.getType(), Packets.AskCheckTIDRange)
pmin_tid, plength, ppartition = next_packet.decode()
self.assertEqual(pmin_tid, add64(max_tid, 1))
@@ -341,11 +370,12 @@ class StorageReplicationHandlerTests(Neo
def test_answerCheckTIDRangeDifferentSmallChunkWithoutNext(self):
min_tid = self.getNextTID()
max_tid = self.getNextTID()
+ critical_tid = self.getNextTID()
length = MIN_RANGE_LENGTH - 1
rid = 12
conn = self.getFakeConnection()
app = self.getApp(tid_check_result=(length - 5, 0, max_tid), rid=rid,
- conn=conn)
+ conn=conn, critical_tid=critical_tid)
handler = ReplicationHandler(app)
# Peer has different data, and less than length
handler.answerCheckTIDRange(conn, min_tid, length, length - 1, 0,
@@ -357,13 +387,14 @@ class StorageReplicationHandlerTests(Neo
tid_packet = tid_call.getParam(0)
next_packet = next_call.getParam(0)
self.assertEqual(tid_packet.getType(), Packets.AskTIDsFrom)
- pmin_tid, plength, ppartition = tid_packet.decode()
+ pmin_tid, pmax_tid, plength, ppartition = tid_packet.decode()
self.assertEqual(pmin_tid, min_tid)
+ self.assertEqual(pmax_tid, critical_tid)
self.assertEqual(plength, length - 1)
self.assertEqual(ppartition, rid)
calls = app.replicator.mockGetNamedCalls('getTIDsFrom')
self.assertEqual(len(calls), 1)
- calls[0].checkArgs(pmin_tid, plength, ppartition)
+ calls[0].checkArgs(pmin_tid, pmax_tid, plength, ppartition)
self.assertEqual(next_packet.getType(), Packets.AskCheckSerialRange)
pmin_oid, pmin_serial, plength, ppartition = next_packet.decode()
self.assertEqual(pmin_oid, ZERO_OID)
@@ -448,11 +479,12 @@ class StorageReplicationHandlerTests(Neo
max_oid = self.getOID(10)
min_serial = self.getNextTID()
max_serial = self.getNextTID()
+ critical_tid = self.getNextTID()
length = MIN_RANGE_LENGTH - 1
rid = 12
conn = self.getFakeConnection()
app = self.getApp(tid_check_result=(length - 5, 0, max_oid, 1,
- max_serial), rid=rid, conn=conn)
+ max_serial), rid=rid, conn=conn, critical_tid=critical_tid)
handler = ReplicationHandler(app)
# Peer has different data
handler.answerCheckSerialRange(conn, min_oid, min_serial, length,
@@ -464,14 +496,17 @@ class StorageReplicationHandlerTests(Neo
serial_packet = serial_call.getParam(0)
next_packet = next_call.getParam(0)
self.assertEqual(serial_packet.getType(), Packets.AskObjectHistoryFrom)
- pmin_oid, pmin_serial, plength, ppartition = serial_packet.decode()
+ pmin_oid, pmin_serial, pmax_serial, plength, ppartition = \
+ serial_packet.decode()
self.assertEqual(pmin_oid, min_oid)
self.assertEqual(pmin_serial, min_serial)
+ self.assertEqual(pmax_serial, critical_tid)
self.assertEqual(plength, length)
self.assertEqual(ppartition, rid)
calls = app.replicator.mockGetNamedCalls('getObjectHistoryFrom')
self.assertEqual(len(calls), 1)
- calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
+ calls[0].checkArgs(pmin_oid, pmin_serial, pmax_serial, plength,
+ ppartition)
self.assertEqual(next_packet.getType(), Packets.AskCheckSerialRange)
pmin_oid, pmin_serial, plength, ppartition = next_packet.decode()
self.assertEqual(pmin_oid, max_oid)
@@ -487,25 +522,29 @@ class StorageReplicationHandlerTests(Neo
max_oid = self.getOID(10)
min_serial = self.getNextTID()
max_serial = self.getNextTID()
+ critical_tid = self.getNextTID()
length = MIN_RANGE_LENGTH - 1
rid = 12
conn = self.getFakeConnection()
app = self.getApp(tid_check_result=(length - 5, 0, max_oid,
- 1, max_serial), rid=rid, conn=conn)
+ 1, max_serial), rid=rid, conn=conn, critical_tid=critical_tid)
handler = ReplicationHandler(app)
# Peer has different data, and less than length
handler.answerCheckSerialRange(conn, min_oid, min_serial, length,
length - 1, 0, max_oid, 1, max_serial)
# Result: ask tid list, and mark replication as done
- pmin_oid, pmin_serial, plength, ppartition = self.checkAskPacket(conn,
- Packets.AskObjectHistoryFrom, decode=True)
+ pmin_oid, pmin_serial, pmax_serial, plength, ppartition = \
+ self.checkAskPacket(conn, Packets.AskObjectHistoryFrom,
+ decode=True)
self.assertEqual(pmin_oid, min_oid)
self.assertEqual(pmin_serial, min_serial)
+ self.assertEqual(pmax_serial, critical_tid)
self.assertEqual(plength, length - 1)
self.assertEqual(ppartition, rid)
calls = app.replicator.mockGetNamedCalls('getObjectHistoryFrom')
self.assertEqual(len(calls), 1)
- calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
+ calls[0].checkArgs(pmin_oid, pmin_serial, pmax_serial, plength,
+ ppartition)
self.assertTrue(app.replicator.replication_done)
if __name__ == "__main__":
Modified: trunk/neo/tests/storage/testStorageHandler.py
==============================================================================
--- trunk/neo/tests/storage/testStorageHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageHandler.py [iso-8859-1] Sun Sep 5 11:50:46 2010
@@ -119,15 +119,17 @@ class StorageStorageHandlerTests(NeoTest
self.app.dm = Mock({'getReplicationTIDList': (INVALID_TID, )})
self.app.pt = Mock({'getPartitions': 1})
tid = self.getNextTID()
- self.operation.askTIDsFrom(conn, tid, 2, 1)
+ tid2 = self.getNextTID()
+ self.operation.askTIDsFrom(conn, tid, tid2, 2, 1)
calls = self.app.dm.mockGetNamedCalls('getReplicationTIDList')
self.assertEquals(len(calls), 1)
- calls[0].checkArgs(tid, 2, 1, 1)
+ calls[0].checkArgs(tid, tid2, 2, 1, 1)
self.checkAnswerTidsFrom(conn)
def test_26_askObjectHistoryFrom(self):
min_oid = self.getOID(2)
min_serial = self.getNextTID()
+ max_serial = self.getNextTID()
length = 4
partition = 8
num_partitions = 16
@@ -137,13 +139,13 @@ class StorageStorageHandlerTests(NeoTest
self.app.pt = Mock({
'getPartitions': num_partitions,
})
- self.operation.askObjectHistoryFrom(conn, min_oid, min_serial, length,
- partition)
+ self.operation.askObjectHistoryFrom(conn, min_oid, min_serial,
+ max_serial, length, partition)
self.checkAnswerObjectHistoryFrom(conn)
calls = self.app.dm.mockGetNamedCalls('getObjectHistoryFrom')
self.assertEquals(len(calls), 1)
- calls[0].checkArgs(min_oid, min_serial, length, num_partitions,
- partition)
+ calls[0].checkArgs(min_oid, min_serial, max_serial, length,
+ num_partitions, partition)
def test_askCheckTIDRange(self):
count = 1
Modified: trunk/neo/tests/storage/testStorageMySQLdb.py
==============================================================================
--- trunk/neo/tests/storage/testStorageMySQLdb.py [iso-8859-1] (original)
+++ trunk/neo/tests/storage/testStorageMySQLdb.py [iso-8859-1] Sun Sep 5 11:50:46 2010
@@ -19,7 +19,7 @@ import unittest
import MySQLdb
from mock import Mock
from neo.util import dump, p64, u64
-from neo.protocol import CellStates, INVALID_PTID, ZERO_OID, ZERO_TID
+from neo.protocol import CellStates, INVALID_PTID, ZERO_OID, ZERO_TID, MAX_TID
from neo.tests import NeoTestBase
from neo.exception import DatabaseFailure
from neo.storage.database.mysqldb import MySQLDatabaseManager
@@ -516,29 +516,40 @@ class StorageMySQSLdbTests(NeoTestBase):
self.db.finishTransaction(tid3)
self.db.finishTransaction(tid4)
# Check full result
- result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, 10, 1, 0)
+ result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, MAX_TID, 10,
+ 1, 0)
self.assertEqual(result, {
oid1: [tid1, tid3],
oid2: [tid2, tid4],
})
# Lower bound is inclusive
- result = self.db.getObjectHistoryFrom(oid1, tid1, 10, 1, 0)
+ result = self.db.getObjectHistoryFrom(oid1, tid1, MAX_TID, 10, 1, 0)
self.assertEqual(result, {
oid1: [tid1, tid3],
oid2: [tid2, tid4],
})
+ # Upper bound is inclusive
+ result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, tid3, 10,
+ 1, 0)
+ self.assertEqual(result, {
+ oid1: [tid1, tid3],
+ oid2: [tid2],
+ })
# Length is total number of serials
- result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, 3, 1, 0)
+ result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, MAX_TID, 3,
+ 1, 0)
self.assertEqual(result, {
oid1: [tid1, tid3],
oid2: [tid2],
})
# Partition constraints are honored
- result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, 10, 2, 0)
+ result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, MAX_TID, 10,
+ 2, 0)
self.assertEqual(result, {
oid1: [tid1, tid3],
})
- result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, 10, 2, 1)
+ result = self.db.getObjectHistoryFrom(ZERO_OID, ZERO_TID, MAX_TID, 10,
+ 2, 1)
self.assertEqual(result, {
oid2: [tid2, tid4],
})
@@ -575,19 +586,21 @@ class StorageMySQSLdbTests(NeoTestBase):
def test_getReplicationTIDList(self):
tid1, tid2, tid3, tid4 = self._storeTransactions(4)
# get tids
- result = self.db.getReplicationTIDList(tid1, 4, 1, 0)
+ # - all
+ result = self.db.getReplicationTIDList(ZERO_TID, MAX_TID, 10, 1, 0)
self.checkSet(result, [tid1, tid2, tid3, tid4])
- result = self.db.getReplicationTIDList(tid1, 4, 2, 0)
+ # - one partition
+ result = self.db.getReplicationTIDList(ZERO_TID, MAX_TID, 10, 2, 0)
self.checkSet(result, [tid1, tid3])
- result = self.db.getReplicationTIDList(tid1, 4, 3, 0)
- self.checkSet(result, [tid1, tid4])
- # get a subset of tids
- result = self.db.getReplicationTIDList(tid3, 4, 1, 0)
+ # - min_tid is inclusive
+ result = self.db.getReplicationTIDList(tid3, MAX_TID, 10, 1, 0)
self.checkSet(result, [tid3, tid4])
- result = self.db.getReplicationTIDList(tid1, 2, 1, 0)
+ # - max tid is inclusive
+ result = self.db.getReplicationTIDList(ZERO_TID, tid2, 10, 1, 0)
+ self.checkSet(result, [tid1, tid2])
+ # - limit
+ result = self.db.getReplicationTIDList(ZERO_TID, MAX_TID, 2, 1, 0)
self.checkSet(result, [tid1, tid2])
- result = self.db.getReplicationTIDList(tid1, 1, 3, 1)
- self.checkSet(result, [tid2])
def test__getObjectData(self):
db = self.db
Modified: trunk/neo/tests/testProtocol.py
==============================================================================
--- trunk/neo/tests/testProtocol.py [iso-8859-1] (original)
+++ trunk/neo/tests/testProtocol.py [iso-8859-1] Sun Sep 5 11:50:46 2010
@@ -595,9 +595,11 @@ class ProtocolTests(NeoTestBase):
def test_AskTIDsFrom(self):
tid = self.getNextTID()
- p = Packets.AskTIDsFrom(tid, 1000, 5)
- min_tid, length, partition = p.decode()
+ tid2 = self.getNextTID()
+ p = Packets.AskTIDsFrom(tid, tid2, 1000, 5)
+ min_tid, max_tid, length, partition = p.decode()
self.assertEqual(min_tid, tid)
+ self.assertEqual(max_tid, tid2)
self.assertEqual(length, 1000)
self.assertEqual(partition, 5)
@@ -607,12 +609,15 @@ class ProtocolTests(NeoTestBase):
def test_AskObjectHistoryFrom(self):
oid = self.getOID(1)
min_serial = self.getNextTID()
+ max_serial = self.getNextTID()
length = 5
partition = 4
- p = Packets.AskObjectHistoryFrom(oid, min_serial, length, partition)
- p_oid, p_min_serial, p_length, p_partition = p.decode()
+ p = Packets.AskObjectHistoryFrom(oid, min_serial, max_serial, length,
+ partition)
+ p_oid, p_min_serial, p_max_serial, p_length, p_partition = p.decode()
self.assertEqual(p_oid, oid)
self.assertEqual(p_min_serial, min_serial)
+ self.assertEqual(p_max_serial, max_serial)
self.assertEqual(p_length, length)
self.assertEqual(p_partition, partition)
More information about the Neo-report
mailing list