[Neo-report] r2451 vincent - in /trunk: neo/storage/database/ neo/tests/storage/ tools/
nobody at svn.erp5.org
nobody at svn.erp5.org
Tue Nov 9 20:41:26 CET 2010
Author: vincent
Date: Tue Nov 9 20:41:26 2010
New Revision: 2451
Log:
Add experimental ZODB.BTrees-based database back-end.
Added:
trunk/neo/storage/database/btree.py
trunk/neo/tests/storage/testStorageBTree.py
Modified:
trunk/neo/storage/database/__init__.py
trunk/tools/runner
Modified: trunk/neo/storage/database/__init__.py
==============================================================================
--- trunk/neo/storage/database/__init__.py [iso-8859-1] (original)
+++ trunk/neo/storage/database/__init__.py [iso-8859-1] Tue Nov 9 20:41:26 2010
@@ -27,6 +27,14 @@ except ImportError:
else:
DATABASE_MANAGER_DICT['MySQL'] = MySQLDatabaseManager
+try:
+ from neo.storage.database.btree import BTreeDatabaseManager
+except ImportError:
+ pass
+else:
+ # XXX: warning: name might change in the future.
+ DATABASE_MANAGER_DICT['BTree'] = BTreeDatabaseManager
+
if not DATABASE_MANAGER_DICT:
raise ImportError('No database back-end available.')
Added: trunk/neo/storage/database/btree.py
==============================================================================
--- trunk/neo/storage/database/btree.py (added)
+++ trunk/neo/storage/database/btree.py [iso-8859-1] Tue Nov 9 20:41:26 2010
@@ -0,0 +1,680 @@
+#
+# Copyright (C) 2010 Nexedi SA
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+"""
+Naive b-tree implementation.
+Simple, though not so well tested.
+Not persistent ! (no data retained after process exit)
+"""
+
+from BTrees.OOBTree import OOBTree as _OOBTree
+import neo
+
+from neo.storage.database import DatabaseManager
+from neo.protocol import CellStates
+from neo import util
+
+# The only purpose of this value (and code using it) is to avoid creating
+# arbitrarily-long lists of values when cleaning up dictionaries.
+KEY_BATCH_SIZE = 1000
+
+# Keep dropped trees in memory to avoid instanciating when not needed.
+TREE_POOL = []
+# How many empty BTree istance to keep in ram
+MAX_TREE_POOL_SIZE = 100
+
+def OOBTree():
+ try:
+ result = TREE_POOL.pop()
+ except IndexError:
+ result = _OOBTree()
+ return result
+
+def prune(tree):
+ if len(TREE_POOL) < MAX_TREE_POOL_SIZE:
+ tree.clear()
+ TREE_POOL.append(tree)
+
+class CreationUndone(Exception):
+ pass
+
+def iterObjSerials(obj):
+ for tserial in obj.values():
+ for serial in tserial.keys():
+ yield serial
+
+def descItems(tree):
+ try:
+ key = tree.maxKey()
+ except ValueError:
+ pass
+ else:
+ while True:
+ yield (key, tree[key])
+ try:
+ key = tree.maxKey(key - 1)
+ except ValueError:
+ break
+
+def descKeys(tree):
+ try:
+ key = tree.maxKey()
+ except ValueError:
+ pass
+ else:
+ while True:
+ yield key
+ try:
+ key = tree.maxKey(key - 1)
+ except ValueError:
+ break
+
+class BTreeDatabaseManager(DatabaseManager):
+
+ _obj = None
+ _trans = None
+ _tobj = None
+ _ttrans = None
+ _pt = None
+ _config = None
+
+ def __init__(self, database):
+ super(BTreeDatabaseManager, self).__init__()
+ self.setup(reset=1)
+
+ def setup(self, reset=0):
+ if reset:
+ self._obj = OOBTree()
+ self._trans = OOBTree()
+ self.dropUnfinishedData()
+ self._pt = {}
+ self._config = {}
+
+ def _begin(self):
+ pass
+
+ def _commit(self):
+ pass
+
+ def _rollback(self):
+ pass
+
+ def getConfiguration(self, key):
+ return self._config[key]
+
+ def _setConfiguration(self, key, value):
+ self._config[key] = value
+
+ def _setPackTID(self, tid):
+ self._setConfiguration('_pack_tid', tid)
+
+ def _getPackTID(self):
+ try:
+ result = int(self.getConfiguration('_pack_tid'))
+ except KeyError:
+ result = -1
+ return result
+
+ def getPartitionTable(self):
+ pt = []
+ append = pt.append
+ for (offset, uuid), state in self._pt.iteritems():
+ append((offset, uuid, state))
+ return pt
+
+ def getLastTID(self, all=True):
+ try:
+ ltid = self._trans.maxKey()
+ except ValueError:
+ ltid = None
+ if all:
+ try:
+ tmp_ltid = self._ttrans.maxKey()
+ except ValueError:
+ tmp_ltid = None
+ tmp_serial = None
+ for tserial in self._tobj.values():
+ try:
+ max_tmp_serial = tserial.maxKey()
+ except ValueError:
+ pass
+ else:
+ tmp_serial = max(tmp_serial, max_tmp_serial)
+ ltid = max(ltid, tmp_ltid, tmp_serial)
+ if ltid is not None:
+ ltid = util.p64(ltid)
+ return ltid
+
+ def getUnfinishedTIDList(self):
+ p64 = util.p64
+ tid_set = set(p64(x) for x in self._ttrans.keys())
+ tid_set.update(p64(x) for x in iterObjSerials(self._tobj))
+ return list(tid_set)
+
+ def objectPresent(self, oid, tid, all=True):
+ u64 = util.u64
+ oid = u64(oid)
+ tid = u64(tid)
+ try:
+ result = self._obj[oid].has_key(tid)
+ except KeyError:
+ if all:
+ try:
+ result = self._tobj[oid].has_key(tid)
+ except KeyError:
+ result = False
+ else:
+ result = False
+ return result
+
+ def _getObjectData(self, oid, value_serial, tid):
+ if value_serial is None:
+ raise CreationUndone
+ if value_serial >= tid:
+ raise ValueError, "Incorrect value reference found for " \
+ "oid %d at tid %d: reference = %d" % (oid, value_serial, tid)
+ try:
+ tserial = self._obj[oid]
+ except KeyError:
+ raise IndexError(oid)
+ try:
+ compression, checksum, value, next_value_serial = tserial[
+ value_serial]
+ except KeyError:
+ raise IndexError(value_serial)
+ if value is None:
+ neo.logging.info("Multiple levels of indirection when " \
+ "searching for object data for oid %d at tid %d. This " \
+ "causes suboptimal performance." % (oid, value_serial))
+ value_serial, compression, checksum, value = self._getObjectData(
+ oid, next_value_serial, value_serial)
+ return value_serial, compression, checksum, value
+
+ def _getObject(self, oid, tid=None, before_tid=None):
+ tserial = self._obj.get(oid)
+ if tserial is None:
+ result = None
+ else:
+ if tid is None:
+ if before_tid is None:
+ try:
+ tid = tserial.maxKey()
+ except ValueError:
+ tid = None
+ else:
+ before_tid -= 1
+ try:
+ tid = tserial.maxKey(before_tid)
+ except ValueError:
+ tid = None
+ result = tserial.get(tid, None)
+ if result:
+ compression, checksum, data, value_serial = result
+ if before_tid is None:
+ next_serial = None
+ else:
+ try:
+ next_serial = tserial.minKey(tid + 1)
+ except ValueError:
+ next_serial = None
+ result = (tid, next_serial, compression, checksum, data,
+ value_serial)
+ return result
+
+ def doSetPartitionTable(self, ptid, cell_list, reset):
+ pt = self._pt
+ if reset:
+ pt.clear()
+ for offset, uuid, state in cell_list:
+ # TODO: this logic should move out of database manager
+ # add 'dropCells(cell_list)' to API and use one query
+ key = (offset, uuid)
+ if state == CellStates.DISCARDED:
+ pt.pop(key, None)
+ else:
+ pt[key] = int(state)
+ self.setPTID(ptid)
+
+ def changePartitionTable(self, ptid, cell_list):
+ self.doSetPartitionTable(ptid, cell_list, False)
+
+ def setPartitionTable(self, ptid, cell_list):
+ self.doSetPartitionTable(ptid, cell_list, True)
+
+ def _dropPartitions(self, num_partitions, offset_list, tree):
+ offset_list = frozenset(offset_list)
+ last = 0
+ while True:
+ to_drop = []
+ append = to_drop.append
+ for key in tree.keys(min=last):
+ if key % num_partitions in offset_list:
+ append(key)
+ if len(to_drop) >= KEY_BATCH_SIZE:
+ last = key + 1
+ break
+ if to_drop:
+ for key in to_drop:
+ prune(tree[key])
+ del tree[key]
+ else:
+ break
+
+ def dropPartitions(self, num_partitions, offset_list):
+ self._dropPartitions(num_partitions, offset_list, self._obj)
+ self._dropPartitions(num_partitions, offset_list, self._trans)
+
+ def dropUnfinishedData(self):
+ self._tobj = OOBTree()
+ self._ttrans = OOBTree()
+
+ def storeTransaction(self, tid, object_list, transaction, temporary=True):
+ u64 = util.u64
+ tid = u64(tid)
+ if temporary:
+ obj = self._tobj
+ trans = self._ttrans
+ else:
+ obj = self._obj
+ trans = self._trans
+ for oid, compression, checksum, data, value_serial in object_list:
+ oid = u64(oid)
+ if data is None:
+ compression = checksum = data
+ else:
+ # TODO: unit-test this raise
+ if value_serial is not None:
+ raise ValueError, 'Either data or value_serial ' \
+ 'must be None (oid %d, tid %d)' % (oid, tid)
+ try:
+ tserial = obj[oid]
+ except KeyError:
+ tserial = obj[oid] = OOBTree()
+ if value_serial is not None:
+ value_serial = u64(value_serial)
+ tserial[tid] = (compression, checksum, data, value_serial)
+
+ if transaction is not None:
+ oid_list, user, desc, ext, packed = transaction
+ trans[tid] = (tuple(oid_list), user, desc, ext, packed)
+
+ def _getDataTIDFromData(self, oid, result):
+ tid, _, _, _, data, value_serial = result
+ if data is None:
+ try:
+ data_serial = self._getObjectData(oid, value_serial, tid)[0]
+ except CreationUndone:
+ data_serial = None
+ else:
+ data_serial = tid
+ return tid, data_serial
+
+ def _getDataTID(self, oid, tid=None, before_tid=None):
+ result = self._getObject(oid, tid=tid, before_tid=before_tid)
+ if result is None:
+ result = (None, None)
+ else:
+ result = self._getDataTIDFromData(oid, result)
+ return result
+
+ def finishTransaction(self, tid):
+ tid = util.u64(tid)
+ obj = self._obj
+ tobj = self._tobj
+ ttrans = self._ttrans
+ def callback(oid, data):
+ try:
+ tserial = obj[oid]
+ except KeyError:
+ tserial = obj[oid] = OOBTree()
+ tserial[tid] = data
+ self._popTransactionFromObj(tobj, tid, callback=callback)
+ try:
+ data = ttrans[tid]
+ except KeyError:
+ pass
+ else:
+ del ttrans[tid]
+ self._trans[tid] = data
+
+ def _popTransactionFromObj(self, tree, tid, callback=None):
+ if callback is None:
+ callback = lambda oid, data: None
+ last = 0
+ while True:
+ to_remove = []
+ append = to_remove.append
+ for oid, tserial in tree.items(min=last):
+ try:
+ data = tserial[tid]
+ except KeyError:
+ continue
+ del tserial[tid]
+ if not tserial:
+ append(oid)
+ callback(oid, data)
+ if len(to_remove) >= KEY_BATCH_SIZE:
+ last = oid + 1
+ break
+ if to_remove:
+ for oid in to_remove:
+ prune(tree[oid])
+ del tree[oid]
+ else:
+ break
+
+ def deleteTransaction(self, tid, all=False):
+ tid = util.u64(tid)
+ self._popTransactionFromObj(self._tobj, tid)
+ try:
+ del self._ttrans[tid]
+ except KeyError:
+ pass
+ if all:
+ self._popTransactionFromObj(self._obj, tid)
+ try:
+ del self._trans[tid]
+ except KeyError:
+ pass
+
+ def deleteObject(self, oid, serial=None):
+ u64 = util.u64
+ oid = u64(oid)
+ obj = self._obj
+ try:
+ tserial = obj[oid]
+ except KeyError:
+ pass
+ else:
+ if serial is None:
+ del obj[oid]
+ else:
+ serial = u64(serial)
+ try:
+ del tserial[serial]
+ except KeyError:
+ pass
+
+ def getTransaction(self, tid, all=False):
+ tid = util.u64(tid)
+ try:
+ result = self._trans[tid]
+ except KeyError:
+ if all:
+ try:
+ result = self._ttrans[tid]
+ except KeyError:
+ result = None
+ else:
+ result = None
+ if result is not None:
+ oid_list, user, desc, ext, packed = result
+ result = (list(oid_list), user, desc, ext, packed)
+ return result
+
+ def getOIDList(self, min_oid, length, num_partitions,
+ partition_list):
+ p64 = util.p64
+ partition_list = frozenset(partition_list)
+ result = []
+ append = result.append
+ for oid in self._obj.keys(min=min_oid):
+ if oid % num_partitions in partition_list:
+ if length == 0:
+ break
+ length -= 1
+ append(p64(oid))
+ return result
+
+ def _getObjectLength(self, oid, value_serial):
+ if value_serial is None:
+ raise CreationUndone
+ _, _, value, value_serial = self._obj[oid][value_serial]
+ if value is None:
+ neo.logging.info("Multiple levels of indirection when " \
+ "searching for object data for oid %d at tid %d. This " \
+ "causes suboptimal performance." % (oid, value_serial))
+ length = self._getObjectLength(oid, value_serial)
+ else:
+ length = len(value)
+ return length
+
+ def getObjectHistory(self, oid, offset=0, length=1):
+ # FIXME: This method doesn't take client's current ransaction id as
+ # parameter, which means it can return transactions in the future of
+ # client's transaction.
+ oid = util.u64(oid)
+ p64 = util.p64
+ pack_tid = self._getPackTID()
+ try:
+ tserial = self._obj[oid]
+ except KeyError:
+ result = None
+ else:
+ result = []
+ append = result.append
+ tserial_iter = descItems(tserial)
+ while offset > 0:
+ tserial_iter.next()
+ offset -= 1
+ for serial, (_, _, value, value_serial) in tserial_iter:
+ if length == 0 or serial < pack_tid:
+ break
+ length -= 1
+ if value is None:
+ try:
+ data_length = self._getObjectLength(oid, value_serial)
+ except CreationUndone:
+ data_length = 0
+ else:
+ data_length = len(value)
+ append((p64(serial), data_length))
+ if not result:
+ result = None
+ return result
+
+ def getObjectHistoryFrom(self, min_oid, min_serial, max_serial, length,
+ num_partitions, partition):
+ u64 = util.u64
+ p64 = util.p64
+ min_oid = u64(min_oid)
+ min_serial = u64(min_serial)
+ max_serial = u64(max_serial)
+ result = {}
+ try:
+ oid_set = self._obj.items(min=min_oid)
+ except ValueError:
+ oid_set = []
+ for oid, tserial in oid_set:
+ if oid % num_partitions == partition:
+ result[p64(oid)] = tid_list = []
+ append = tid_list.append
+ if oid == min_oid:
+ try:
+ tid_seq = tserial.keys(min=min_serial, max=max_serial)
+ except ValueError:
+ continue
+ else:
+ tid_seq = tserial.keys(max=max_serial)
+ for tid in tid_seq:
+ if length == 0:
+ break
+ length -= 1
+ append(p64(tid))
+ else:
+ continue
+ break
+ return result
+
+ def getTIDList(self, offset, length, num_partitions, partition_list):
+ p64 = util.p64
+ partition_list = frozenset(partition_list)
+ result = []
+ append = result.append
+ trans_iter = descKeys(self._trans)
+ while offset > 0:
+ tid = trans_iter.next()
+ if tid % num_partitions in partition_list:
+ offset -= 1
+ for tid in trans_iter:
+ if tid % num_partitions in partition_list:
+ if length == 0:
+ break
+ length -= 1
+ append(p64(tid))
+ return result
+
+ def getReplicationTIDList(self, min_tid, max_tid, length, num_partitions,
+ partition):
+ p64 = util.p64
+ u64 = util.u64
+ result = []
+ append = result.append
+ try:
+ tid_seq = self._trans.keys(min=u64(min_tid), max=u64(max_tid))
+ except ValueError:
+ tid_seq = []
+ for tid in tid_seq:
+ if tid % num_partitions == partition:
+ if length == 0:
+ break
+ length -= 1
+ append(p64(tid))
+ return result
+
+ def _updatePackFuture(self, oid, orig_serial, max_serial,
+ updateObjectDataForPack):
+ p64 = util.p64
+ # Before deleting this objects revision, see if there is any
+ # transaction referencing its value at max_serial or above.
+ # If there is, copy value to the first future transaction. Any further
+ # reference is just updated to point to the new data location.
+ value_serial = None
+ obj = self._obj
+ for tree in (obj, self._tobj):
+ try:
+ tserial = tree[oid]
+ except KeyError:
+ continue
+ for serial, record in tserial.items(
+ min=max_serial):
+ if record[3] == orig_serial:
+ if value_serial is None:
+ value_serial = serial
+ tserial[serial] = tserial[orig_serial]
+ else:
+ record = list(record)
+ record[3] = value_serial
+ tserial[serial] = tuple(record)
+ def getObjectData():
+ assert value_serial is None
+ return obj[oid][orig_serial][:3]
+ if value_serial:
+ value_serial = p64(value_serial)
+ updateObjectDataForPack(p64(oid), p64(orig_serial), value_serial,
+ getObjectData)
+
+ def pack(self, tid, updateObjectDataForPack):
+ tid = util.u64(tid)
+ updatePackFuture = self._updatePackFuture
+ self._setPackTID(tid)
+ obj = self._obj
+ last_obj = 0
+ while True:
+ obj_to_drop = []
+ append_obj = obj_to_drop.append
+ for oid, tserial in obj.items(min=last_obj):
+ try:
+ max_serial = tserial.maxKey(tid)
+ except ValueError:
+ continue
+ try:
+ tserial.maxKey(max_serial)
+ except ValueError:
+ if tserial[max_serial][2] == '':
+ max_serial += 1
+ else:
+ continue
+ last = 0
+ while True:
+ to_drop = []
+ append = to_drop.append
+ for serial in tserial.keys(min=last, max=max_serial,
+ excludemax=True):
+ updatePackFuture(oid, serial, max_serial,
+ updateObjectDataForPack)
+ append(serial)
+ if len(to_drop) >= KEY_BATCH_SIZE:
+ last = serial + 1
+ break
+ if to_drop:
+ for serial in to_drop:
+ del tserial[serial]
+ else:
+ break
+ if not tserial:
+ append_obj(oid)
+ if len(obj_to_drop) >= KEY_BATCH_SIZE:
+ last_obj = oid + 1
+ break
+ if obj_to_drop:
+ for oid in to_drop:
+ prune(obj[oid])
+ del obj[oid]
+ else:
+ break
+
+ def checkTIDRange(self, min_tid, length, num_partitions, partition):
+ # XXX: XOR is a lame checksum
+ count = 0
+ tid_checksum = 0
+ max_tid = 0
+ for max_tid in self._trans.keys(min=util.u64(min_tid)):
+ if max_tid % num_partitions == partition:
+ if count >= length:
+ break
+ tid_checksum ^= max_tid
+ count += 1
+ return count, tid_checksum, util.p64(max_tid)
+
+ def checkSerialRange(self, min_oid, min_serial, length, num_partitions,
+ partition):
+ # XXX: XOR is a lame checksum
+ u64 = util.u64
+ p64 = util.p64
+ min_oid = u64(min_oid)
+ count = 0
+ oid_checksum = serial_checksum = 0
+ max_oid = max_serial = 0
+ for max_oid, tserial in self._obj.items(min=min_oid):
+ if max_oid % num_partitions == partition:
+ if max_oid == min_oid:
+ try:
+ serial_iter = tserial.keys(min=u64(min_serial))
+ except ValueError:
+ continue
+ else:
+ serial_iter = tserial.keys()
+ for max_serial in serial_iter:
+ if count >= length:
+ break
+ oid_checksum ^= max_oid
+ serial_checksum ^= max_serial
+ count += 1
+ if count >= length:
+ break
+ return count, oid_checksum, p64(max_oid), serial_checksum, p64(max_serial)
+
Added: trunk/neo/tests/storage/testStorageBTree.py
==============================================================================
--- trunk/neo/tests/storage/testStorageBTree.py (added)
+++ trunk/neo/tests/storage/testStorageBTree.py [iso-8859-1] Tue Nov 9 20:41:26 2010
@@ -0,0 +1,34 @@
+#
+# Copyright (C) 2009-2010 Nexedi SA
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+
+import unittest
+from mock import Mock
+from neo.tests.storage.testStorageDBTests import StorageDBTests
+from neo.storage.database.btree import BTreeDatabaseManager
+
+class StorageBTreeTests(StorageDBTests):
+
+ def getDB(self):
+ # db manager
+ db = BTreeDatabaseManager('')
+ db.setup()
+ return db
+
+del StorageDBTests
+
+if __name__ == "__main__":
+ unittest.main()
Modified: trunk/tools/runner
==============================================================================
--- trunk/tools/runner [iso-8859-1] (original)
+++ trunk/tools/runner [iso-8859-1] Tue Nov 9 20:41:26 2010
@@ -56,6 +56,7 @@ UNIT_TEST_MODULES = [
'neo.tests.storage.testStorageApp',
'neo.tests.storage.testStorageHandler',
'neo.tests.storage.testStorageMySQLdb',
+ 'neo.tests.storage.testStorageBTree',
'neo.tests.storage.testVerificationHandler',
'neo.tests.storage.testIdentificationHandler',
'neo.tests.storage.testTransactions',
More information about the Neo-report
mailing list