[Neo-report] r2615 olivier.cros [1/2] - in /trunk: neo/ neo/admin/ neo/client/ neo/client/...
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Jan 17 16:25:38 CET 2011
Author: olivier.cros
Date: Mon Jan 17 16:25:37 2011
New Revision: 2615
Log:
Move source code from neo/ to neo/lib
In order to prepare the eggification of the different neo parts, we created a
new neo/lib module, containing all of the main neo's functions. It allows to
make neo a virtual namespace, and so not containing module code anymore.
Added:
trunk/neo/__init__.py
trunk/neo/lib/
trunk/neo/lib/__init__.py
- copied unchanged from r2614, trunk/neo/__init__.py
trunk/neo/lib/attributeTracker.py
- copied, changed from r2614, trunk/neo/attributeTracker.py
trunk/neo/lib/bootstrap.py
- copied, changed from r2614, trunk/neo/bootstrap.py
trunk/neo/lib/config.py
- copied, changed from r2614, trunk/neo/config.py
trunk/neo/lib/connection.py
- copied, changed from r2614, trunk/neo/connection.py
trunk/neo/lib/connector.py
- copied unchanged from r2614, trunk/neo/connector.py
trunk/neo/lib/dispatcher.py
- copied, changed from r2614, trunk/neo/dispatcher.py
trunk/neo/lib/epoll.py
- copied unchanged from r2614, trunk/neo/epoll.py
trunk/neo/lib/event.py
- copied, changed from r2614, trunk/neo/event.py
trunk/neo/lib/exception.py
- copied unchanged from r2614, trunk/neo/exception.py
trunk/neo/lib/handler.py
- copied, changed from r2614, trunk/neo/handler.py
trunk/neo/lib/live_debug.py
- copied unchanged from r2614, trunk/neo/live_debug.py
trunk/neo/lib/locking.py
- copied unchanged from r2614, trunk/neo/locking.py
trunk/neo/lib/logger.py
- copied, changed from r2614, trunk/neo/logger.py
trunk/neo/lib/node.py
- copied, changed from r2614, trunk/neo/node.py
trunk/neo/lib/profiling.py
- copied unchanged from r2614, trunk/neo/profiling.py
trunk/neo/lib/protocol.py
- copied, changed from r2614, trunk/neo/protocol.py
trunk/neo/lib/pt.py
- copied, changed from r2614, trunk/neo/pt.py
trunk/neo/lib/util.py
- copied unchanged from r2614, trunk/neo/util.py
trunk/neo/scripts/runner.py
- copied unchanged from r2614, trunk/tools/runner
Removed:
trunk/neo/attributeTracker.py
trunk/neo/bootstrap.py
trunk/neo/config.py
trunk/neo/connection.py
trunk/neo/connector.py
trunk/neo/dispatcher.py
trunk/neo/epoll.py
trunk/neo/event.py
trunk/neo/exception.py
trunk/neo/handler.py
trunk/neo/live_debug.py
trunk/neo/locking.py
trunk/neo/logger.py
trunk/neo/node.py
trunk/neo/profiling.py
trunk/neo/protocol.py
trunk/neo/pt.py
trunk/neo/util.py
trunk/tools/runner
Modified:
trunk/neo/admin/app.py
trunk/neo/admin/handler.py
trunk/neo/client/Storage.py
trunk/neo/client/app.py
trunk/neo/client/handlers/__init__.py
trunk/neo/client/handlers/master.py
trunk/neo/client/handlers/storage.py
trunk/neo/client/iterator.py
trunk/neo/client/poll.py
trunk/neo/client/pool.py
trunk/neo/master/app.py
trunk/neo/master/handlers/__init__.py
trunk/neo/master/handlers/administration.py
trunk/neo/master/handlers/client.py
trunk/neo/master/handlers/election.py
trunk/neo/master/handlers/identification.py
trunk/neo/master/handlers/secondary.py
trunk/neo/master/handlers/shutdown.py
trunk/neo/master/handlers/storage.py
trunk/neo/master/pt.py
trunk/neo/master/recovery.py
trunk/neo/master/transactions.py
trunk/neo/master/verification.py
trunk/neo/neoctl/app.py
trunk/neo/neoctl/handler.py
trunk/neo/neoctl/neoctl.py
trunk/neo/scripts/__init__.py
trunk/neo/scripts/neoadmin.py
trunk/neo/scripts/neoctl.py
trunk/neo/scripts/neomaster.py
trunk/neo/scripts/neomigrate.py
trunk/neo/scripts/neostorage.py
trunk/neo/storage/app.py
trunk/neo/storage/database/__init__.py
trunk/neo/storage/database/btree.py
trunk/neo/storage/database/manager.py
trunk/neo/storage/database/mysqldb.py
trunk/neo/storage/handlers/__init__.py
trunk/neo/storage/handlers/client.py
trunk/neo/storage/handlers/hidden.py
trunk/neo/storage/handlers/identification.py
trunk/neo/storage/handlers/initialization.py
trunk/neo/storage/handlers/master.py
trunk/neo/storage/handlers/replication.py
trunk/neo/storage/handlers/storage.py
trunk/neo/storage/handlers/verification.py
trunk/neo/storage/replicator.py
trunk/neo/storage/transactions.py
trunk/neo/tests/__init__.py
trunk/neo/tests/client/testClientApp.py
trunk/neo/tests/client/testMasterHandler.py
trunk/neo/tests/client/testStorageHandler.py
trunk/neo/tests/functional/__init__.py
trunk/neo/tests/functional/testMaster.py
trunk/neo/tests/functional/testStorage.py
trunk/neo/tests/master/testClientHandler.py
trunk/neo/tests/master/testElectionHandler.py
trunk/neo/tests/master/testMasterApp.py
trunk/neo/tests/master/testMasterPT.py
trunk/neo/tests/master/testRecovery.py
trunk/neo/tests/master/testStorageHandler.py
trunk/neo/tests/master/testTransactions.py
trunk/neo/tests/master/testVerification.py
trunk/neo/tests/storage/testClientHandler.py
trunk/neo/tests/storage/testIdentificationHandler.py
trunk/neo/tests/storage/testInitializationHandler.py
trunk/neo/tests/storage/testMasterHandler.py
trunk/neo/tests/storage/testReplication.py
trunk/neo/tests/storage/testReplicationHandler.py
trunk/neo/tests/storage/testReplicator.py
trunk/neo/tests/storage/testStorageApp.py
trunk/neo/tests/storage/testStorageDBTests.py
trunk/neo/tests/storage/testStorageHandler.py
trunk/neo/tests/storage/testStorageMySQLdb.py
trunk/neo/tests/storage/testVerificationHandler.py
trunk/neo/tests/testBootstrap.py
trunk/neo/tests/testConnection.py
trunk/neo/tests/testDispatcher.py
trunk/neo/tests/testEvent.py
trunk/neo/tests/testHandler.py
trunk/neo/tests/testNodes.py
trunk/neo/tests/testPT.py
trunk/neo/tests/testProtocol.py
trunk/neo/tests/testUtil.py
trunk/neo/tests/zodb/__init__.py
Added: trunk/neo/__init__.py
==============================================================================
--- trunk/neo/__init__.py (added)
+++ trunk/neo/__init__.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -0,0 +1,3 @@
+import pkg_resources
+pkg_resources.declare_namespace(__name__)
+
Modified: trunk/neo/admin/app.py
==============================================================================
--- trunk/neo/admin/app.py [iso-8859-1] (original)
+++ trunk/neo/admin/app.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -15,19 +15,19 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-import neo
+import neo.lib
-from neo.node import NodeManager
-from neo.event import EventManager
-from neo.connection import ListeningConnection
-from neo.exception import PrimaryFailure
+from neo.lib.node import NodeManager
+from neo.lib.event import EventManager
+from neo.lib.connection import ListeningConnection
+from neo.lib.exception import PrimaryFailure
from neo.admin.handler import AdminEventHandler, MasterEventHandler, \
MasterRequestEventHandler
-from neo.connector import getConnectorHandler
-from neo.bootstrap import BootstrapManager
-from neo.pt import PartitionTable
-from neo.protocol import NodeTypes, NodeStates, Packets, Errors
-from neo.live_debug import register as registerLiveDebugger
+from neo.lib.connector import getConnectorHandler
+from neo.lib.bootstrap import BootstrapManager
+from neo.lib.pt import PartitionTable
+from neo.lib.protocol import NodeTypes, NodeStates, Packets, Errors
+from neo.lib.live_debug import register as registerLiveDebugger
class Dispatcher:
"""Dispatcher use to redirect master request to handler"""
@@ -68,7 +68,7 @@ class Application(object):
self.server = config.getBind()
self.master_addresses = config.getMasters()
- neo.logging.debug('IP address is %s, port is %d', *(self.server))
+ neo.lib.logging.debug('IP address is %s, port is %d', *(self.server))
# The partition table is initialized after getting the number of
# partitions.
@@ -105,7 +105,7 @@ class Application(object):
while True:
self.em.poll(1)
except PrimaryFailure:
- neo.logging.error('primary master is down')
+ neo.lib.logging.error('primary master is down')
def connectToPrimary(self):
Modified: trunk/neo/admin/handler.py
==============================================================================
--- trunk/neo/admin/handler.py [iso-8859-1] (original)
+++ trunk/neo/admin/handler.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -17,11 +17,11 @@
import neo
-from neo.handler import EventHandler
-from neo import protocol
-from neo.protocol import Packets, Errors
-from neo.exception import PrimaryFailure
-from neo.util import dump
+from neo.lib.handler import EventHandler
+from neo.lib import protocol
+from neo.lib.protocol import Packets, Errors
+from neo.lib.exception import PrimaryFailure
+from neo.lib.util import dump
def forward_ask(klass):
def wrapper(self, conn, *args, **kw):
@@ -42,7 +42,7 @@ class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster."""
def askPartitionList(self, conn, min_offset, max_offset, uuid):
- neo.logging.info("ask partition list from %s to %s for %s" %
+ neo.lib.logging.info("ask partition list from %s to %s for %s" %
(min_offset, max_offset, dump(uuid)))
app = self.app
# check we have one pt otherwise ask it to PMN
@@ -61,7 +61,7 @@ class AdminEventHandler(EventHandler):
def askNodeList(self, conn, node_type):
- neo.logging.info("ask node list for %s" %(node_type))
+ neo.lib.logging.info("ask node list for %s" %(node_type))
def node_filter(n):
return n.getType() is node_type
node_list = self.app.nm.getList(node_filter)
@@ -70,7 +70,7 @@ class AdminEventHandler(EventHandler):
conn.answer(p)
def setNodeState(self, conn, uuid, state, modify_partition_table):
- neo.logging.info("set node state for %s-%s" %(dump(uuid), state))
+ neo.lib.logging.info("set node state for %s-%s" %(dump(uuid), state))
node = self.app.nm.getByUUID(uuid)
if node is None:
raise protocol.ProtocolError('invalid uuid')
@@ -144,7 +144,7 @@ class MasterEventHandler(EventHandler):
def answerNodeInformation(self, conn):
# XXX: This will no more exists when the initialization module will be
# implemented for factorize code (as done for bootstrap)
- neo.logging.debug("answerNodeInformation")
+ neo.lib.logging.debug("answerNodeInformation")
def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.pt.update(ptid, cell_list, self.app.nm)
@@ -176,12 +176,12 @@ class MasterRequestEventHandler(EventHan
client_conn.answer(packet)
def answerClusterState(self, conn, state):
- neo.logging.info("answerClusterState for a conn")
+ neo.lib.logging.info("answerClusterState for a conn")
self.app.cluster_state = state
self._answerNeoCTL(conn, Packets.AnswerClusterState(state))
def answerPartitionTable(self, conn, ptid, row_list):
- neo.logging.info("answerPartitionTable for a conn")
+ neo.lib.logging.info("answerPartitionTable for a conn")
client_conn, kw = self.app.dispatcher.pop(conn.getPeerId())
# sent client the partition table
self.app.sendPartitionTable(client_conn)
Removed: trunk/neo/attributeTracker.py
==============================================================================
--- trunk/neo/attributeTracker.py [iso-8859-1] (original)
+++ trunk/neo/attributeTracker.py (removed)
@@ -1,65 +0,0 @@
-#
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-ATTRIBUTE_TRACKER_ENABLED = False
-
-from neo.locking import LockUser
-
-"""
- Usage example:
-
- from neo import attributeTracker
-
- class Foo(object):
-
- ...
-
- def assertBar(self, expected_value):
- if self.bar_attr != expected_value:
- attributeTracker.whoSet(self, 'bar_attr')
-
- attributeTracker.track(Foo)
-"""
-
-MODIFICATION_CONTAINER_ID = '_attribute_tracker_dict'
-
-def tracker_setattr(self, attr, value, setattr):
- modification_container = getattr(self, MODIFICATION_CONTAINER_ID, None)
- if modification_container is None:
- modification_container = {}
- setattr(self, MODIFICATION_CONTAINER_ID, modification_container)
- modification_container[attr] = LockUser()
- setattr(self, attr, value)
-
-if ATTRIBUTE_TRACKER_ENABLED:
- def track(klass):
- original_setattr = klass.__setattr__
- def klass_tracker_setattr(self, attr, value):
- tracker_setattr(self, attr, value, original_setattr)
- klass.__setattr__ = klass_tracker_setattr
-else:
- def track(klass):
- pass
-
-def whoSet(instance, attr):
- result = getattr(instance, MODIFICATION_CONTAINER_ID, None)
- if result is not None:
- result = result.get(attr)
- if result is not None:
- result = result.formatStack()
- return result
-
Removed: trunk/neo/bootstrap.py
==============================================================================
--- trunk/neo/bootstrap.py [iso-8859-1] (original)
+++ trunk/neo/bootstrap.py (removed)
@@ -1,158 +0,0 @@
-#
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-import neo
-from time import sleep
-
-from neo.handler import EventHandler
-from neo.protocol import Packets
-from neo.util import dump
-from neo.connection import ClientConnection
-
-NO_SERVER = ('0.0.0.0', 0)
-
-class BootstrapManager(EventHandler):
- """
- Manage the bootstrap stage, lookup for the primary master then connect to it
- """
-
- def __init__(self, app, name, node_type, uuid=None, server=NO_SERVER):
- """
- Manage the bootstrap stage of a non-master node, it lookup for the
- primary master node, connect to it then returns when the master node
- is ready.
- """
- EventHandler.__init__(self, app)
- self.primary = None
- self.server = server
- self.node_type = node_type
- self.uuid = uuid
- self.name = name
- self.num_replicas = None
- self.num_partitions = None
- self.current = None
-
- def connectionCompleted(self, conn):
- """
- Triggered when the network connection is successful.
- Now ask who's the primary.
- """
- EventHandler.connectionCompleted(self, conn)
- self.current.setRunning()
- conn.ask(Packets.AskPrimary())
-
- def connectionFailed(self, conn):
- """
- Triggered when the network connection failed.
- Restart bootstrap.
- """
- EventHandler.connectionFailed(self, conn)
- self.current = None
-
- def connectionLost(self, conn, new_state):
- """
- Triggered when an established network connection is lost.
- Restart bootstrap.
- """
- self.current.setTemporarilyDown()
- self.current = None
-
- def notReady(self, conn, message):
- """
- The primary master send this message when it is still not ready to
- handle the client node.
- Close connection and restart.
- """
- # master are still electing on of them
- self.current = None
- conn.close()
-
- def answerPrimary(self, conn, primary_uuid, known_master_list):
- """
- A master answer who's the primary. If it's another node, connect to it.
- If it's itself then the primary is successfully found, ask
- identification.
- """
- nm = self.app.nm
-
- # Register new master nodes.
- for address, uuid in known_master_list:
- node = nm.getByAddress(address)
- if node is None:
- node = nm.createMaster(address=address)
- node.setUUID(uuid)
-
- self.primary = nm.getByUUID(primary_uuid)
- if self.primary is None or self.current is not self.primary:
- # three cases here:
- # - something goes wrong (unknown UUID)
- # - this master doesn't know who's the primary
- # - got the primary's uuid, so cut here
- self.current = None
- conn.close()
- return
-
- neo.logging.info('connected to a primary master node')
- conn.ask(Packets.RequestIdentification(self.node_type,
- self.uuid, self.server, self.name))
-
- def acceptIdentification(self, conn, node_type,
- uuid, num_partitions, num_replicas, your_uuid):
- """
- The primary master has accepted the node.
- """
- self.num_partitions = num_partitions
- self.num_replicas = num_replicas
- if self.uuid != your_uuid:
- # got an uuid from the primary master
- self.uuid = your_uuid
- neo.logging.info('Got a new UUID : %s' % dump(self.uuid))
- conn.setUUID(uuid)
-
- def getPrimaryConnection(self, connector_handler):
- """
- Primary lookup/connection process.
- Returns when the connection is made.
- """
- neo.logging.info('connecting to a primary master node')
- em, nm = self.app.em, self.app.nm
- index = 0
- self.current = nm.getMasterList()[0]
- conn = None
- # retry until identified to the primary
- while self.primary is None or conn.getUUID() != self.primary.getUUID():
- if self.current is None:
- # conn closed
- conn = None
- # select a master
- master_list = nm.getMasterList()
- index = (index + 1) % len(master_list)
- self.current = master_list[index]
- if index == 0:
- # tried all known masters, sleep a bit
- sleep(1)
- if conn is None:
- # open the connection
- addr = self.current.getAddress()
- conn = ClientConnection(em, self, addr, connector_handler())
- # still processing
- em.poll(1)
- node = nm.getByUUID(conn.getUUID())
- return (node, conn, self.uuid, self.num_partitions, self.num_replicas)
-
-
-
Modified: trunk/neo/client/Storage.py
==============================================================================
--- trunk/neo/client/Storage.py [iso-8859-1] (original)
+++ trunk/neo/client/Storage.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -19,9 +19,9 @@ from ZODB import BaseStorage, ConflictRe
from zope.interface import implements
import ZODB.interfaces
-from neo import setupLog
-from neo.util import add64
-from neo.protocol import ZERO_TID
+from neo.lib import setupLog
+from neo.lib.util import add64
+from neo.lib.protocol import ZERO_TID
from neo.client.app import Application
from neo.client.exception import NEOStorageNotFoundError
from neo.client.exception import NEOStorageDoesNotExistError
@@ -255,7 +255,8 @@ class Storage(BaseStorage.BaseStorage,
def pack(self, t, referencesf, gc=False):
if gc:
- neo.logging.warning('Garbage Collection is not available in NEO, '
+ neo.lib.logging.warning(
+ 'Garbage Collection is not available in NEO, '
'please use an external tool. Packing without GC.')
self.app.pack(t)
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -18,7 +18,7 @@
from thread import get_ident
from cPickle import dumps, loads
from zlib import compress as real_compress, decompress
-from neo.locking import Queue, Empty
+from neo.lib.locking import Queue, Empty
from random import shuffle
import time
import os
@@ -28,26 +28,26 @@ from ZODB.POSException import ReadConfli
from ZODB.ConflictResolution import ResolvedSerial
from persistent.TimeStamp import TimeStamp
-import neo
-from neo.protocol import NodeTypes, Packets, INVALID_PARTITION, ZERO_TID
-from neo.event import EventManager
-from neo.util import makeChecksum as real_makeChecksum, dump
-from neo.locking import Lock
-from neo.connection import MTClientConnection, OnTimeout, ConnectionClosed
-from neo.node import NodeManager
-from neo.connector import getConnectorHandler
+import neo.lib
+from neo.lib.protocol import NodeTypes, Packets, INVALID_PARTITION, ZERO_TID
+from neo.lib.event import EventManager
+from neo.lib.util import makeChecksum as real_makeChecksum, dump
+from neo.lib.locking import Lock
+from neo.lib.connection import MTClientConnection, OnTimeout, ConnectionClosed
+from neo.lib.node import NodeManager
+from neo.lib.connector import getConnectorHandler
from neo.client.exception import NEOStorageError, NEOStorageCreationUndoneError
from neo.client.exception import NEOStorageNotFoundError
-from neo.exception import NeoException
+from neo.lib.exception import NeoException
from neo.client.handlers import storage, master
-from neo.dispatcher import Dispatcher, ForgottenPacket
+from neo.lib.dispatcher import Dispatcher, ForgottenPacket
from neo.client.poll import ThreadedPoll, psThreadedPoll
from neo.client.iterator import Iterator
from neo.client.mq import MQ, MQIndex
from neo.client.pool import ConnectionPool
-from neo.util import u64, parseMasterList
-from neo.profiling import profiler_decorator, PROFILING_ENABLED
-from neo.live_debug import register as registerLiveDebugger
+from neo.lib.util import u64, parseMasterList
+from neo.lib.profiling import profiler_decorator, PROFILING_ENABLED
+from neo.lib.live_debug import register as registerLiveDebugger
if PROFILING_ENABLED:
# Those functions require a "real" python function wrapper before they can
@@ -441,7 +441,7 @@ class Application(object):
"""
Lookup for the current primary master node
"""
- neo.logging.debug('connecting to primary master...')
+ neo.lib.logging.debug('connecting to primary master...')
ready = False
nm = self.nm
queue = self.local_var.queue
@@ -473,7 +473,8 @@ class Application(object):
# Query for primary master node
if conn.getConnector() is None:
# This happens if a connection could not be established.
- neo.logging.error('Connection to master node %s failed',
+ neo.lib.logging.error(
+ 'Connection to master node %s failed',
self.trying_master_node)
continue
try:
@@ -485,15 +486,16 @@ class Application(object):
# If we reached the primary master node, mark as connected
connected = self.primary_master_node is not None and \
self.primary_master_node is self.trying_master_node
- neo.logging.info('Connected to %s' % (self.primary_master_node, ))
+ neo.lib.logging.info(
+ 'Connected to %s' % (self.primary_master_node, ))
try:
ready = self.identifyToPrimaryNode(conn)
except ConnectionClosed:
- neo.logging.error('Connection to %s lost',
+ neo.lib.logging.error('Connection to %s lost',
self.trying_master_node)
self.primary_master_node = None
continue
- neo.logging.info("Connected and ready")
+ neo.lib.logging.info("Connected and ready")
return conn
def identifyToPrimaryNode(self, conn):
@@ -502,7 +504,7 @@ class Application(object):
Might raise ConnectionClosed so that the new primary can be
looked-up again.
"""
- neo.logging.info('Initializing from master')
+ neo.lib.logging.info('Initializing from master')
queue = self.local_var.queue
# Identify to primary master and request initial data
while conn.getUUID() is None:
@@ -634,13 +636,13 @@ class Application(object):
= self.local_var.asked_object
if noid != oid:
# Oops, try with next node
- neo.logging.error('got wrong oid %s instead of %s from %s',
+ neo.lib.logging.error('got wrong oid %s instead of %s from %s',
noid, dump(oid), conn)
self.local_var.asked_object = -1
continue
elif checksum != makeChecksum(data):
# Check checksum.
- neo.logging.error('wrong checksum from %s for oid %s',
+ neo.lib.logging.error('wrong checksum from %s for oid %s',
conn, dump(oid))
self.local_var.asked_object = -1
continue
@@ -696,7 +698,8 @@ class Application(object):
"""Store object."""
if transaction is not self.local_var.txn:
raise StorageTransactionError(self, transaction)
- neo.logging.debug('storing oid %s serial %s', dump(oid), dump(serial))
+ neo.lib.logging.debug(
+ 'storing oid %s serial %s', dump(oid), dump(serial))
self._store(oid, serial, data)
return None
@@ -792,7 +795,7 @@ class Application(object):
# them), and requeue our already-sent store requests.
# XXX: currently, brute-force is implemented: we send
# object data again.
- neo.logging.info('Deadlock avoidance triggered on %r:%r',
+ neo.lib.logging.info('Deadlock avoidance triggered on %r:%r',
dump(oid), dump(serial))
for store_oid, store_data in \
local_var.data_dict.iteritems():
@@ -803,7 +806,7 @@ class Application(object):
else:
if store_data is '':
# Some undo
- neo.logging.warning('Deadlock avoidance cannot'
+ neo.lib.logging.warning('Deadlock avoidance cannot'
' reliably work with undo, this must be '
'implemented.')
break
@@ -815,7 +818,7 @@ class Application(object):
new_data = tryToResolveConflict(oid, conflict_serial,
serial, data)
if new_data is not None:
- neo.logging.info('Conflict resolution succeed for ' \
+ neo.lib.logging.info('Conflict resolution succeed for ' \
'%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial))
# Mark this conflict as resolved
@@ -827,7 +830,7 @@ class Application(object):
append(oid)
resolved = True
else:
- neo.logging.info('Conflict resolution failed for ' \
+ neo.lib.logging.info('Conflict resolution failed for ' \
'%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial))
if not resolved:
@@ -877,7 +880,7 @@ class Application(object):
for oid, store_dict in \
local_var.object_stored_counter_dict.iteritems():
if not store_dict:
- neo.logging.error('tpc_store failed')
+ neo.lib.logging.error('tpc_store failed')
raise NEOStorageError('tpc_store failed')
elif oid in resolved_oid_set:
append((oid, ResolvedSerial))
@@ -900,7 +903,7 @@ class Application(object):
local_var.data_list)
add_involved_nodes = self.local_var.involved_nodes.add
for node, conn in self.cp.iterateForObject(tid, writable=True):
- neo.logging.debug("voting object %s on %s", dump(tid),
+ neo.lib.logging.debug("voting object %s on %s", dump(tid),
dump(conn.getUUID()))
try:
self._askStorage(conn, packet)
@@ -911,7 +914,7 @@ class Application(object):
# check at least one storage node accepted
if txn_stored_counter == 0:
- neo.logging.error('tpc_vote failed')
+ neo.lib.logging.error('tpc_vote failed')
raise NEOStorageError('tpc_vote failed')
# Check if master connection is still alive.
# This is just here to lower the probability of detecting a problem
@@ -939,8 +942,10 @@ class Application(object):
try:
conn.notify(p)
except:
- neo.logging.error('Exception in tpc_abort while notifying ' \
- 'storage node %r of abortion, ignoring.', conn, exc_info=1)
+ neo.lib.logging.error(
+ 'Exception in tpc_abort while notifying' \
+ 'storage node %r of abortion, ignoring.',
+ conn, exc_info=1)
self._getMasterConnection().notify(p)
# Just wait for responses to arrive. If any leads to an exception,
@@ -953,8 +958,10 @@ class Application(object):
try:
_waitAnyMessage()
except:
- neo.logging.error('Exception in tpc_abort while handling ' \
- 'pending answers, ignoring.', exc_info=1)
+ neo.lib.logging.error(
+ 'Exception in tpc_abort while' \
+ 'handling pending answers, ignoring.',
+ exc_info=1)
self.local_var.clear()
@@ -1140,7 +1147,8 @@ class Application(object):
update(tid_list)
ordered_tids = list(ordered_tids)
ordered_tids.sort(reverse=True)
- neo.logging.debug("UndoLog tids %s", [dump(x) for x in ordered_tids])
+ neo.lib.logging.debug(
+ "UndoLog tids %s", [dump(x) for x in ordered_tids])
# For each transaction, get info
undo_info = []
append = undo_info.append
@@ -1275,7 +1283,7 @@ class Application(object):
self.cp.flush()
self.master_conn = None
# Stop polling thread
- neo.logging.debug('Stopping %s', self.poll_thread)
+ neo.lib.logging.debug('Stopping %s', self.poll_thread)
self.poll_thread.stop()
psThreadedPoll()
close = __del__
Modified: trunk/neo/client/handlers/__init__.py
==============================================================================
--- trunk/neo/client/handlers/__init__.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/__init__.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -15,8 +15,8 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-from neo.handler import EventHandler
-from neo.protocol import ProtocolError
+from neo.lib.handler import EventHandler
+from neo.lib.protocol import ProtocolError
class BaseHandler(EventHandler):
"""Base class for client-side EventHandler implementations."""
Modified: trunk/neo/client/handlers/master.py
==============================================================================
--- trunk/neo/client/handlers/master.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/master.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -15,12 +15,12 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-import neo
+import neo.lib
from neo.client.handlers import BaseHandler, AnswerBaseHandler
-from neo.pt import MTPartitionTable as PartitionTable
-from neo.protocol import NodeTypes, NodeStates, ProtocolError
-from neo.util import dump
+from neo.lib.pt import MTPartitionTable as PartitionTable
+from neo.lib.protocol import NodeTypes, NodeStates, ProtocolError
+from neo.lib.util import dump
from neo.client.exception import NEOStorageError
class PrimaryBootstrapHandler(AnswerBaseHandler):
@@ -43,7 +43,7 @@ class PrimaryBootstrapHandler(AnswerBase
if your_uuid is None:
raise ProtocolError('No UUID supplied')
app.uuid = your_uuid
- neo.logging.info('Got an UUID: %s', dump(app.uuid))
+ neo.lib.logging.info('Got an UUID: %s', dump(app.uuid))
node = app.nm.getByAddress(conn.getAddress())
conn.setUUID(uuid)
@@ -66,7 +66,7 @@ class PrimaryBootstrapHandler(AnswerBase
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
- neo.logging.warning('Unknown primary master UUID: %s. ' \
+ neo.lib.logging.warning('Unknown primary master UUID: %s. ' \
'Ignoring.' % dump(primary_uuid))
else:
app.primary_master_node = primary_node
@@ -94,7 +94,7 @@ class PrimaryNotificationsHandler(BaseHa
def connectionClosed(self, conn):
app = self.app
- neo.logging.critical("connection to primary master node closed")
+ neo.lib.logging.critical("connection to primary master node closed")
conn.close()
app.master_conn = None
app.primary_master_node = None
@@ -104,19 +104,19 @@ class PrimaryNotificationsHandler(BaseHa
app = self.app
if app.master_conn is not None:
assert conn is app.master_conn
- neo.logging.critical("connection timeout to primary master node " \
- "expired")
+ neo.lib.logging.critical(
+ "connection timeout to primary master node expired")
BaseHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
app = self.app
if app.master_conn is not None:
assert conn is app.master_conn
- neo.logging.critical("primary master node is broken")
+ neo.lib.logging.critical("primary master node is broken")
BaseHandler.peerBroken(self, conn)
def stopOperation(self, conn):
- neo.logging.critical("master node ask to stop operation")
+ neo.lib.logging.critical("master node ask to stop operation")
def invalidateObjects(self, conn, tid, oid_list):
app = self.app
Modified: trunk/neo/client/handlers/storage.py
==============================================================================
--- trunk/neo/client/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/client/handlers/storage.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -18,10 +18,10 @@
from ZODB.TimeStamp import TimeStamp
from ZODB.POSException import ConflictError
-import neo
+import neo.lib
from neo.client.handlers import BaseHandler, AnswerBaseHandler
-from neo.protocol import NodeTypes, ProtocolError, LockState
-from neo.util import dump
+from neo.lib.protocol import NodeTypes, ProtocolError, LockState
+from neo.lib.util import dump
from neo.client.exception import NEOStorageError, NEOStorageNotFoundError
from neo.client.exception import NEOStorageDoesNotExistError
@@ -74,7 +74,7 @@ class StorageAnswersHandler(AnswerBaseHa
local_var = self.app.local_var
object_stored_counter_dict = local_var.object_stored_counter_dict[oid]
if conflicting:
- neo.logging.info('%r report a conflict for %r with %r', conn,
+ neo.lib.logging.info('%r report a conflict for %r with %r', conn,
dump(oid), dump(serial))
conflict_serial_dict = local_var.conflict_serial_dict
if serial in object_stored_counter_dict:
@@ -96,7 +96,7 @@ class StorageAnswersHandler(AnswerBaseHa
raise NEOStorageError('Wrong TID, transaction not started')
def answerTIDsFrom(self, conn, tid_list):
- neo.logging.debug('Get %d TIDs from %r', len(tid_list), conn)
+ neo.lib.logging.debug('Get %d TIDs from %r', len(tid_list), conn)
assert not self.app.local_var.tids_from.intersection(set(tid_list))
self.app.local_var.tids_from.update(tid_list)
@@ -144,7 +144,7 @@ class StorageAnswersHandler(AnswerBaseHa
raise ConflictError, 'Lock wait timeout for oid %s on %r' % (
dump(oid), conn)
elif status == LockState.GRANTED:
- neo.logging.info('Store of oid %s was successful, but after ' \
+ neo.lib.logging.info('Store of oid %s was successful, but after ' \
'timeout.', dump(oid))
# XXX: Not sure what to do in this case yet, for now do nothing.
else:
Modified: trunk/neo/client/iterator.py
==============================================================================
--- trunk/neo/client/iterator.py [iso-8859-1] (original)
+++ trunk/neo/client/iterator.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -18,7 +18,7 @@
from ZODB import BaseStorage
from zope.interface import implements
import ZODB.interfaces
-from neo.util import u64, add64
+from neo.lib.util import u64, add64
from neo.client.exception import NEOStorageCreationUndoneError
from neo.client.exception import NEOStorageNotFoundError
Modified: trunk/neo/client/poll.py
==============================================================================
--- trunk/neo/client/poll.py [iso-8859-1] (original)
+++ trunk/neo/client/poll.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -16,8 +16,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from threading import Thread, Event, enumerate as thread_enum
-from neo.locking import Lock
-import neo
+from neo.lib.locking import Lock
+import neo.lib
class _ThreadedPoll(Thread):
"""Polling thread."""
@@ -34,7 +34,7 @@ class _ThreadedPoll(Thread):
self._stop = Event()
def run(self):
- neo.logging.debug('Started %s', self)
+ neo.lib.logging.debug('Started %s', self)
while not self.stopping():
# First check if we receive any new message from other node
try:
@@ -42,8 +42,8 @@ class _ThreadedPoll(Thread):
# interrupt this call when stopping.
self.em.poll(1)
except:
- self.neo.logging.error('poll raised, retrying', exc_info=1)
- self.neo.logging.debug('Threaded poll stopped')
+ self.neo.lib.logging.error('poll raised, retrying', exc_info=1)
+ self.neo.lib.logging.debug('Threaded poll stopped')
self._stop.clear()
def stop(self):
@@ -110,7 +110,7 @@ def psThreadedPoll(log=None):
Logs alive ThreadedPoll threads.
"""
if log is None:
- log = neo.logging.debug
+ log = neo.lib.logging.debug
for thread in thread_enum():
if not isinstance(thread, ThreadedPoll):
continue
Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -18,12 +18,12 @@
import time
from random import shuffle
-import neo
-from neo.locking import RLock
-from neo.protocol import NodeTypes, Packets
-from neo.connection import MTClientConnection, ConnectionClosed
+import neo.lib
+from neo.lib.locking import RLock
+from neo.lib.protocol import NodeTypes, Packets
+from neo.lib.connection import MTClientConnection, ConnectionClosed
from neo.client.exception import NEOStorageError
-from neo.profiling import profiler_decorator
+from neo.lib.profiling import profiler_decorator
# How long before we might retry a connection to a node to which connection
# failed in the past.
@@ -61,7 +61,7 @@ class ConnectionPool(object):
assert addr is not None
app = self.app
app.setNodeReady()
- neo.logging.debug('trying to connect to %s - %s', node,
+ neo.lib.logging.debug('trying to connect to %s - %s', node,
node.getState())
conn = MTClientConnection(app.em, app.storage_event_handler, addr,
connector=app.connector_handler(), dispatcher=app.dispatcher)
@@ -70,7 +70,7 @@ class ConnectionPool(object):
try:
if conn.getConnector() is None:
# This happens, if a connection could not be established.
- neo.logging.error('Connection to %r failed', node)
+ neo.lib.logging.error('Connection to %r failed', node)
self.notifyFailure(node)
return None
@@ -84,15 +84,15 @@ class ConnectionPool(object):
app._waitMessage(conn, msg_id,
handler=app.storage_bootstrap_handler)
except ConnectionClosed:
- neo.logging.error('Connection to %r failed', node)
+ neo.lib.logging.error('Connection to %r failed', node)
self.notifyFailure(node)
return None
if app.isNodeReady():
- neo.logging.info('Connected %r', node)
+ neo.lib.logging.info('Connected %r', node)
return conn
else:
- neo.logging.info('%r not ready', node)
+ neo.lib.logging.info('%r not ready', node)
self.notifyFailure(node)
return NOT_READY
@@ -107,7 +107,7 @@ class ConnectionPool(object):
not self.app.dispatcher.registered(conn):
del self.connection_dict[conn.getUUID()]
conn.close()
- neo.logging.debug('_dropConnections : connection to ' \
+ neo.lib.logging.debug('_dropConnections : connection to ' \
'storage node %s:%d closed', *(conn.getAddress()))
if len(self.connection_dict) <= self.max_pool_size:
break
Removed: trunk/neo/config.py
==============================================================================
--- trunk/neo/config.py [iso-8859-1] (original)
+++ trunk/neo/config.py (removed)
@@ -1,93 +0,0 @@
-#
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-from ConfigParser import SafeConfigParser
-from neo import util
-
-
-class ConfigurationManager(object):
- """
- Configuration manager that load options from a configuration file and
- command line arguments
- """
-
- def __init__(self, defaults, config_file, section, argument_list):
- self.defaults = defaults
- self.argument_list = argument_list
- self.parser = None
- if config_file is not None:
- self.parser = SafeConfigParser(defaults)
- self.parser.read(config_file)
- self.section = section
-
- def __get(self, key, optional=False):
- value = self.argument_list.get(key)
- if value is None:
- if self.parser is None:
- value = self.defaults.get(key)
- else:
- value = self.parser.get(self.section, key)
- if value is None and not optional:
- raise RuntimeError("Option '%s' is undefined'" % (key, ))
- return value
-
- def getMasters(self):
- """ Get the master node list except itself """
- masters = self.__get('masters')
- if not masters:
- return []
- # lod master node list except itself
- return util.parseMasterList(masters, except_node=self.getBind())
-
- def getBind(self):
- """ Get the address to bind to """
- bind = self.__get('bind')
- if ':' in bind:
- (ip, port) = bind.split(':')
- else:
- (ip, port) = (bind, 0)
- ip = util.resolve(ip)
- return (ip, int(port))
-
- def getDatabase(self):
- return self.__get('database')
-
- def getAdapter(self):
- return self.__get('adapter')
-
- def getCluster(self):
- cluster = self.__get('cluster')
- assert cluster != '', "Cluster name must be non-empty"
- return cluster
-
- def getName(self):
- return self.__get('name')
-
- def getReplicas(self):
- return int(self.__get('replicas'))
-
- def getPartitions(self):
- return int(self.__get('partitions'))
-
- def getReset(self):
- # only from command line
- return self.argument_list.get('reset', False)
-
- def getUUID(self):
- # only from command line
- return util.bin(self.argument_list.get('uuid', None))
-
Removed: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py (removed)
@@ -1,794 +0,0 @@
-#
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-from time import time
-
-import neo
-from neo.locking import RLock
-
-from neo.protocol import PacketMalformedError, Packets, ParserState
-from neo.connector import ConnectorException, ConnectorTryAgainException, \
- ConnectorInProgressException, ConnectorConnectionRefusedException, \
- ConnectorConnectionClosedException
-from neo.util import dump
-from neo.logger import PACKET_LOGGER
-
-from neo import attributeTracker
-from neo.util import ReadBuffer
-from neo.profiling import profiler_decorator
-
-PING_DELAY = 6
-PING_TIMEOUT = 5
-INCOMING_TIMEOUT = 10
-CRITICAL_TIMEOUT = 30
-
-class ConnectionClosed(Exception):
- pass
-
-def not_closed(func):
- def decorator(self, *args, **kw):
- if self.connector is None:
- raise ConnectorConnectionClosedException
- return func(self, *args, **kw)
- return decorator
-
-
-def lockCheckWrapper(func):
- """
- This function is to be used as a wrapper around
- MT(Client|Server)Connection class methods.
-
- It uses a "_" method on RLock class, so it might stop working without
- notice (sadly, RLock does not offer any "acquired" method, but that one
- will do as it checks that current thread holds this lock).
-
- It requires moniroted class to have an RLock instance in self._lock
- property.
- """
- def wrapper(self, *args, **kw):
- if not self._lock._is_owned():
- import traceback
- neo.logging.warning('%s called on %s instance without being ' \
- 'locked. Stack:\n%s', func.func_code.co_name,
- self.__class__.__name__, ''.join(traceback.format_stack()))
- # Call anyway
- return func(self, *args, **kw)
- return wrapper
-
-class OnTimeout(object):
- """
- Simple helper class for on_timeout parameter used in HandlerSwitcher
- class.
- """
- def __init__(self, func, *args, **kw):
- self.func = func
- self.args = args
- self.kw = kw
-
- def __call__(self, conn, msg_id):
- return self.func(conn, msg_id, *self.args, **self.kw)
-
-class HandlerSwitcher(object):
- _next_timeout = None
- _next_timeout_msg_id = None
- _next_on_timeout = None
-
- def __init__(self, handler):
- # pending handlers and related requests
- self._pending = [[{}, handler]]
- self._is_handling = False
-
- def clear(self):
- handler = self._pending[0][1]
- self._pending = [[{}, handler]]
-
- def isPending(self):
- return bool(self._pending[0][0])
-
- def getHandler(self):
- return self._pending[0][1]
-
- def getLastHandler(self):
- """ Return the last (may be unapplied) handler registered """
- return self._pending[-1][1]
-
- @profiler_decorator
- def emit(self, request, timeout, on_timeout):
- # register the request in the current handler
- _pending = self._pending
- if self._is_handling:
- # If this is called while handling a packet, the response is to
- # be excpected for the current handler...
- (request_dict, _) = _pending[0]
- else:
- # ...otherwise, queue for for the latest handler
- assert len(_pending) == 1 or _pending[0][0]
- (request_dict, _) = _pending[-1]
- msg_id = request.getId()
- answer_class = request.getAnswerClass()
- assert answer_class is not None, "Not a request"
- assert msg_id not in request_dict, "Packet id already expected"
- next_timeout = self._next_timeout
- if next_timeout is None or timeout < next_timeout:
- self._next_timeout = timeout
- self._next_timeout_msg_id = msg_id
- self._next_on_timeout = on_timeout
- request_dict[msg_id] = (answer_class, timeout, on_timeout)
-
- def checkTimeout(self, connection, t):
- next_timeout = self._next_timeout
- if next_timeout is not None and next_timeout < t:
- msg_id = self._next_timeout_msg_id
- if self._next_on_timeout is None:
- result = msg_id
- else:
- if self._next_on_timeout(connection, msg_id):
- # Don't notify that a timeout occured, and forget about
- # this answer.
- for (request_dict, _) in self._pending:
- request_dict.pop(msg_id, None)
- self._updateNextTimeout()
- result = None
- else:
- # Notify that a timeout occured
- result = msg_id
- else:
- result = None
- return result
-
- def handle(self, connection, packet):
- assert not self._is_handling
- self._is_handling = True
- try:
- self._handle(connection, packet)
- finally:
- self._is_handling = False
-
- @profiler_decorator
- def _handle(self, connection, packet):
- assert len(self._pending) == 1 or self._pending[0][0]
- PACKET_LOGGER.dispatch(connection, packet, 'from')
- if connection.isClosed() and packet.ignoreOnClosedConnection():
- neo.logging.debug('Ignoring packet %r on closed connection %r',
- packet, connection)
- return
- msg_id = packet.getId()
- (request_dict, handler) = self._pending[0]
- # notifications are not expected
- if not packet.isResponse():
- handler.packetReceived(connection, packet)
- return
- # checkout the expected answer class
- (klass, timeout, _) = request_dict.pop(msg_id, (None, None, None))
- if klass and isinstance(packet, klass) or packet.isError():
- handler.packetReceived(connection, packet)
- else:
- neo.logging.error('Unexpected answer %r in %r', packet, connection)
- notification = Packets.Notify('Unexpected answer: %r' % packet)
- try:
- connection.notify(notification)
- except ConnectorConnectionClosedException:
- pass
- connection.abort()
- handler.peerBroken(connection)
- # apply a pending handler if no more answers are pending
- while len(self._pending) > 1 and not self._pending[0][0]:
- del self._pending[0]
- neo.logging.debug('Apply handler %r on %r', self._pending[0][1],
- connection)
- if timeout == self._next_timeout:
- self._updateNextTimeout()
-
- def _updateNextTimeout(self):
- # Find next timeout and its msg_id
- timeout_list = []
- extend = timeout_list.extend
- for (request_dict, handler) in self._pending:
- extend(((timeout, msg_id, on_timeout) \
- for msg_id, (_, timeout, on_timeout) in \
- request_dict.iteritems()))
- if timeout_list:
- timeout_list.sort(key=lambda x: x[0])
- self._next_timeout, self._next_timeout_msg_id, \
- self._next_on_timeout = timeout_list[0]
- else:
- self._next_timeout, self._next_timeout_msg_id, \
- self._next_on_timeout = None, None, None
-
- @profiler_decorator
- def setHandler(self, handler):
- can_apply = len(self._pending) == 1 and not self._pending[0][0]
- if can_apply:
- # nothing is pending, change immediately
- self._pending[0][1] = handler
- else:
- # put the next handler in queue
- self._pending.append([{}, handler])
- return can_apply
-
-
-class Timeout(object):
- """ Keep track of connection-level timeouts """
-
- def __init__(self):
- self._ping_time = None
- self._critical_time = None
-
- def update(self, t, force=False):
- """
- Send occurred:
- - set ping time if earlier than existing one
- """
- ping_time = self._ping_time
- t += PING_DELAY
- if force or ping_time is None or t < ping_time:
- self._ping_time = t
-
- def refresh(self, t):
- """
- Recv occured:
- - reschedule next ping time
- - as this is an evidence that node is alive, remove pong expectation
- """
- self._ping_time = t + PING_DELAY
- self._critical_time = None
-
- def ping(self, t):
- """
- Ping send occured:
- - reschedule next ping time
- - set pong expectation
- """
- self._ping_time = t + PING_DELAY
- self._critical_time = t + PING_TIMEOUT
-
- def softExpired(self, t):
- """ Do we need to ping ? """
- return self._ping_time < t
-
- def hardExpired(self, t):
- """ Have we reached pong latest arrival time, if set ? """
- critical_time = self._critical_time
- return critical_time is not None and critical_time < t
-
-
-class BaseConnection(object):
- """A base connection."""
-
- def __init__(self, event_manager, handler, connector, addr=None):
- assert connector is not None, "Need a low-level connector"
- self.em = event_manager
- self.connector = connector
- self.addr = addr
- self._handlers = HandlerSwitcher(handler)
- self._timeout = Timeout()
- event_manager.register(self)
-
- def isPending(self):
- return self._handlers.isPending()
-
- def checkTimeout(self, t):
- handlers = self._handlers
- if handlers.isPending():
- msg_id = handlers.checkTimeout(self, t)
- if msg_id is not None:
- neo.logging.info('timeout for #0x%08x with %r', msg_id, self)
- self.close()
- self.getHandler().timeoutExpired(self)
- elif self._timeout.hardExpired(t):
- # critical time reach or pong not received, abort
- neo.logging.info('timeout with %r', self)
- self.notify(Packets.Notify('Timeout'))
- self.abort()
- self.getHandler().timeoutExpired(self)
- elif self._timeout.softExpired(t):
- self._timeout.ping(t)
- self.ping()
-
- def lock(self):
- return 1
-
- def unlock(self):
- return None
-
- def getConnector(self):
- return self.connector
-
- def getAddress(self):
- return self.addr
-
- def readable(self):
- raise NotImplementedError
-
- def writable(self):
- raise NotImplementedError
-
- def close(self):
- """Close the connection."""
- if self.connector is not None:
- em = self.em
- em.removeReader(self)
- em.removeWriter(self)
- em.unregister(self)
- self.connector.shutdown()
- self.connector.close()
- self.connector = None
-
- def __repr__(self):
- address = self.addr and '%s:%d' % self.addr or '?'
- return '<%s(uuid=%s, address=%s, closed=%s) at %x>' % (
- self.__class__.__name__,
- dump(self.getUUID()),
- address,
- self.isClosed(),
- id(self),
- )
-
- __del__ = close
-
- def getHandler(self):
- return self._handlers.getHandler()
-
- def setHandler(self, handler):
- if self._handlers.setHandler(handler):
- neo.logging.debug('Set handler %r on %r', handler, self)
- else:
- neo.logging.debug('Delay handler %r on %r', handler, self)
-
- def getEventManager(self):
- return self.em
-
- def getUUID(self):
- return None
-
- def isClosed(self):
- return self.connector is None
-
- def isAborted(self):
- return False
-
- def isListening(self):
- return False
-
- def isServer(self):
- return False
-
- def isClient(self):
- return False
-
- def hasPendingMessages(self):
- return False
-
- def whoSetConnector(self):
- """
- Debugging method: call this method to know who set the current
- connector value.
- """
- return attributeTracker.whoSet(self, 'connector')
-
-attributeTracker.track(BaseConnection)
-
-class ListeningConnection(BaseConnection):
- """A listen connection."""
-
- def __init__(self, event_manager, handler, addr, connector, **kw):
- neo.logging.debug('listening to %s:%d', *addr)
- BaseConnection.__init__(self, event_manager, handler,
- addr=addr, connector=connector)
- self.connector.makeListeningConnection(addr)
- self.em.addReader(self)
-
- def readable(self):
- try:
- new_s, addr = self.connector.getNewConnection()
- neo.logging.debug('accepted a connection from %s:%d', *addr)
- handler = self.getHandler()
- new_conn = ServerConnection(self.getEventManager(), handler,
- connector=new_s, addr=addr)
- # A request for a node identification should arrive.
- self._timeout.update(time())
- handler.connectionAccepted(new_conn)
- except ConnectorTryAgainException:
- pass
-
- def getAddress(self):
- return self.connector.getAddress()
-
- def writable(self):
- return False
-
- def isListening(self):
- return True
-
-
-class Connection(BaseConnection):
- """A connection."""
-
- def __init__(self, event_manager, handler, connector, addr=None):
- BaseConnection.__init__(self, event_manager, handler,
- connector=connector, addr=addr)
- self.read_buf = ReadBuffer()
- self.write_buf = []
- self.cur_id = 0
- self.peer_id = 0
- self.aborted = False
- self.uuid = None
- self._queue = []
- self._on_close = None
- self._parser_state = ParserState()
- event_manager.addReader(self)
-
- def setOnClose(self, callback):
- assert self._on_close is None
- self._on_close = callback
-
- def isAborted(self):
- return self.aborted
-
- def getUUID(self):
- return self.uuid
-
- def setUUID(self, uuid):
- self.uuid = uuid
-
- def setPeerId(self, peer_id):
- self.peer_id = peer_id
-
- def getPeerId(self):
- return self.peer_id
-
- @profiler_decorator
- def _getNextId(self):
- next_id = self.cur_id
- self.cur_id = (next_id + 1) & 0xffffffff
- return next_id
-
- def close(self):
- neo.logging.debug('closing a connector for %r', self)
- BaseConnection.close(self)
- if self._on_close is not None:
- self._on_close()
- self._on_close = None
- del self.write_buf[:]
- self.read_buf.clear()
- self._handlers.clear()
-
- def abort(self):
- """Abort dealing with this connection."""
- neo.logging.debug('aborting a connector for %r', self)
- self.aborted = True
-
- def writable(self):
- """Called when self is writable."""
- self._send()
- if not self.write_buf and self.connector is not None:
- if self.aborted:
- self.close()
- else:
- self.em.removeWriter(self)
-
- def readable(self):
- """Called when self is readable."""
- self._recv()
- self.analyse()
-
- if self.aborted:
- self.em.removeReader(self)
-
- def analyse(self):
- """Analyse received data."""
- while True:
- # parse a packet
- try:
- packet = Packets.parse(self.read_buf, self._parser_state)
- if packet is None:
- break
- except PacketMalformedError, msg:
- self.getHandler()._packetMalformed(self, msg)
- return
- self._timeout.refresh(time())
- packet_type = packet.getType()
- if packet_type == Packets.Ping:
- # Send a pong notification
- PACKET_LOGGER.dispatch(self, packet, 'from')
- self.answer(Packets.Pong(), packet.getId())
- elif packet_type == Packets.Pong:
- # Skip PONG packets, its only purpose is refresh the timeout
- # generated upong ping. But still log them.
- PACKET_LOGGER.dispatch(self, packet, 'from')
- else:
- self._queue.append(packet)
-
- def hasPendingMessages(self):
- """
- Returns True if there are messages queued and awaiting processing.
- """
- return len(self._queue) != 0
-
- def process(self):
- """
- Process a pending packet.
- """
- # check out packet and process it with current handler
- packet = self._queue.pop(0)
- self._handlers.handle(self, packet)
-
- def pending(self):
- return self.connector is not None and self.write_buf
-
- def _closure(self, was_connected=True):
- assert self.connector is not None, self.whoSetConnector()
- # process the network events with the last registered handler to
- # solve issues where a node is lost with pending handlers and
- # create unexpected side effects.
- # XXX: This solution is being tested and should be approved or reverted
- handler = self._handlers.getLastHandler()
- self.close()
- if was_connected:
- handler.connectionClosed(self)
- else:
- handler.connectionFailed(self)
-
- @profiler_decorator
- def _recv(self):
- """Receive data from a connector."""
- try:
- data = self.connector.receive()
- except ConnectorTryAgainException:
- pass
- except ConnectorConnectionRefusedException:
- # should only occur while connecting
- self._closure(was_connected=False)
- except ConnectorConnectionClosedException:
- # connection resetted by peer, according to the man, this error
- # should not occurs but it seems it's false
- neo.logging.debug('Connection reset by peer: %r', self.connector)
- self._closure()
- except:
- neo.logging.debug('Unknown connection error: %r', self.connector)
- self._closure()
- # unhandled connector exception
- raise
- else:
- if not data:
- neo.logging.debug('Connection %r closed in recv',
- self.connector)
- self._closure()
- return
- self.read_buf.append(data)
-
- @profiler_decorator
- def _send(self):
- """Send data to a connector."""
- if not self.write_buf:
- return
- msg = ''.join(self.write_buf)
- try:
- n = self.connector.send(msg)
- except ConnectorTryAgainException:
- pass
- except ConnectorConnectionClosedException:
- # connection resetted by peer
- neo.logging.debug('Connection reset by peer: %r', self.connector)
- self._closure()
- except:
- neo.logging.debug('Unknown connection error: %r', self.connector)
- # unhandled connector exception
- self._closure()
- raise
- else:
- if not n:
- neo.logging.debug('Connection %r closed in send',
- self.connector)
- self._closure()
- return
- if n == len(msg):
- del self.write_buf[:]
- else:
- self.write_buf = [msg[n:]]
-
- @profiler_decorator
- def _addPacket(self, packet):
- """Add a packet into the write buffer."""
- if self.connector is None:
- return
-
- was_empty = not bool(self.write_buf)
-
- self.write_buf.extend(packet.encode())
-
- if was_empty:
- # enable polling for writing.
- self.em.addWriter(self)
- PACKET_LOGGER.dispatch(self, packet, ' to ')
-
- @not_closed
- def notify(self, packet):
- """ Then a packet with a new ID """
- msg_id = self._getNextId()
- packet.setId(msg_id)
- self._addPacket(packet)
- return msg_id
-
- @profiler_decorator
- @not_closed
- def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None):
- """
- Send a packet with a new ID and register the expectation of an answer
- """
- msg_id = self._getNextId()
- packet.setId(msg_id)
- self._addPacket(packet)
- t = time()
- # If there is no pending request, initialise timeout values.
- if not self._handlers.isPending():
- self._timeout.update(t, force=True)
- self._handlers.emit(packet, t + timeout, on_timeout)
- return msg_id
-
- @not_closed
- def answer(self, packet, msg_id=None):
- """ Answer to a packet by re-using its ID for the packet answer """
- if msg_id is None:
- msg_id = self.getPeerId()
- packet.setId(msg_id)
- assert packet.isResponse(), packet
- self._addPacket(packet)
-
- @not_closed
- def ping(self):
- packet = Packets.Ping()
- packet.setId(self._getNextId())
- self._addPacket(packet)
-
-
-class ClientConnection(Connection):
- """A connection from this node to a remote node."""
-
- def __init__(self, event_manager, handler, addr, connector, **kw):
- self.connecting = True
- Connection.__init__(self, event_manager, handler, addr=addr,
- connector=connector)
- handler.connectionStarted(self)
- try:
- try:
- self.connector.makeClientConnection(addr)
- except ConnectorInProgressException:
- event_manager.addWriter(self)
- else:
- self.connecting = False
- self.getHandler().connectionCompleted(self)
- except ConnectorConnectionRefusedException:
- self._closure(was_connected=False)
- except ConnectorException:
- # unhandled connector exception
- self._closure(was_connected=False)
- raise
-
- def writable(self):
- """Called when self is writable."""
- if self.connecting:
- err = self.connector.getError()
- if err:
- self._closure(was_connected=False)
- return
- else:
- self.connecting = False
- self.getHandler().connectionCompleted(self)
- self.em.addReader(self)
- else:
- Connection.writable(self)
-
- def isClient(self):
- return True
-
-
-class ServerConnection(Connection):
- """A connection from a remote node to this node."""
-
- def isServer(self):
- return True
-
-
-class MTClientConnection(ClientConnection):
- """A Multithread-safe version of ClientConnection."""
-
- def __init__(self, *args, **kwargs):
- # _lock is only here for lock debugging purposes. Do not use.
- self._lock = lock = RLock()
- self.acquire = lock.acquire
- self.release = lock.release
- self.dispatcher = kwargs.pop('dispatcher')
- self.dispatcher.needPollThread()
- self.lock()
- try:
- super(MTClientConnection, self).__init__(*args, **kwargs)
- finally:
- self.unlock()
-
- def lock(self, blocking = 1):
- return self.acquire(blocking = blocking)
-
- def unlock(self):
- self.release()
-
- @lockCheckWrapper
- def writable(self, *args, **kw):
- return super(MTClientConnection, self).writable(*args, **kw)
-
- @lockCheckWrapper
- def readable(self, *args, **kw):
- return super(MTClientConnection, self).readable(*args, **kw)
-
- @lockCheckWrapper
- def analyse(self, *args, **kw):
- return super(MTClientConnection, self).analyse(*args, **kw)
-
- def notify(self, *args, **kw):
- self.lock()
- try:
- return super(MTClientConnection, self).notify(*args, **kw)
- finally:
- self.unlock()
-
- @profiler_decorator
- def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None,
- queue=None):
- self.lock()
- try:
- if self.isClosed():
- raise ConnectionClosed
- # XXX: Here, we duplicate Connection.ask because we need to call
- # self.dispatcher.register after setId is called and before
- # _addPacket is called.
- msg_id = self._getNextId()
- packet.setId(msg_id)
- if queue is None:
- if not isinstance(packet, Packets.Ping):
- raise TypeError, 'Only Ping packet can be asked ' \
- 'without a queue, got a %r.' % (packet, )
- else:
- self.dispatcher.register(self, msg_id, queue)
- self._addPacket(packet)
- t = time()
- # If there is no pending request, initialise timeout values.
- if not self._handlers.isPending():
- self._timeout.update(t)
- self._handlers.emit(packet, t + timeout, on_timeout)
- return msg_id
- finally:
- self.unlock()
-
- @lockCheckWrapper
- def answer(self, *args, **kw):
- return super(MTClientConnection, self).answer(*args, **kw)
-
- @lockCheckWrapper
- def checkTimeout(self, *args, **kw):
- return super(MTClientConnection, self).checkTimeout(*args, **kw)
-
- def close(self):
- self.lock()
- try:
- super(MTClientConnection, self).close()
- finally:
- self.release()
-
- @lockCheckWrapper
- def process(self, *args, **kw):
- return super(MTClientConnection, self).process(*args, **kw)
-
Removed: trunk/neo/connector.py
==============================================================================
--- trunk/neo/connector.py [iso-8859-1] (original)
+++ trunk/neo/connector.py (removed)
@@ -1,185 +0,0 @@
-#
-# Copyright (C) 2009-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-import socket
-import errno
-
-# Global connector registry.
-# Fill by calling registerConnectorHandler.
-# Read by calling getConnectorHandler.
-connector_registry = {}
-DEFAULT_CONNECTOR = 'SocketConnector'
-
-def registerConnectorHandler(connector_handler):
- connector_registry[connector_handler.__name__] = connector_handler
-
-def getConnectorHandler(connector=None):
- if connector is None:
- connector = DEFAULT_CONNECTOR
- if isinstance(connector, basestring):
- connector_handler = connector_registry.get(connector)
- else:
- # Allow to directly provide a handler class without requiring to
- # register it first.
- connector_handler = connector
- return connector_handler
-
-class SocketConnector:
- """ This class is a wrapper for a socket """
-
- is_listening = False
- remote_addr = None
- is_closed = None
-
- def __init__(self, s=None, accepted_from=None):
- self.accepted_from = accepted_from
- if accepted_from is not None:
- self.remote_addr = accepted_from
- self.is_listening = False
- self.is_closed = False
- if s is None:
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- else:
- self.socket = s
- self.socket_fd = self.socket.fileno()
- # always use non-blocking sockets
- self.socket.setblocking(0)
- # disable Nagle algorithm to reduce latency
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-
- def makeClientConnection(self, addr):
- self.is_closed = False
- self.remote_addr = addr
- try:
- self.socket.connect(addr)
- except socket.error, (err, errmsg):
- if err == errno.EINPROGRESS:
- raise ConnectorInProgressException
- if err == errno.ECONNREFUSED:
- raise ConnectorConnectionRefusedException
- raise ConnectorException, 'makeClientConnection to %s failed:' \
- ' %s:%s' % (addr, err, errmsg)
-
- def makeListeningConnection(self, addr):
- self.is_closed = False
- self.is_listening = True
- try:
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.socket.bind(addr)
- self.socket.listen(5)
- except socket.error, (err, errmsg):
- self.socket.close()
- raise ConnectorException, 'makeListeningConnection on %s failed:' \
- ' %s:%s' % (addr, err, errmsg)
-
- def getError(self):
- return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
-
- def getAddress(self):
- return self.socket.getsockname()
-
- def getDescriptor(self):
- # this descriptor must only be used by the event manager, where it
- # guarantee unicity only while the connector is opened and registered
- # in epoll
- return self.socket_fd
-
- def getNewConnection(self):
- try:
- new_s, addr = self.socket.accept()
- new_s = SocketConnector(new_s, accepted_from=addr)
- return new_s, addr
- except socket.error, (err, errmsg):
- if err == errno.EAGAIN:
- raise ConnectorTryAgainException
- raise ConnectorException, 'getNewConnection failed: %s:%s' % \
- (err, errmsg)
-
- def shutdown(self):
- # This may fail if the socket is not connected.
- try:
- self.socket.shutdown(socket.SHUT_RDWR)
- except socket.error:
- pass
-
- def receive(self):
- try:
- return self.socket.recv(4096)
- except socket.error, (err, errmsg):
- if err == errno.EAGAIN:
- raise ConnectorTryAgainException
- if err in (errno.ECONNREFUSED, errno.EHOSTUNREACH):
- raise ConnectorConnectionRefusedException
- if err in (errno.ECONNRESET, errno.ETIMEDOUT):
- raise ConnectorConnectionClosedException
- raise ConnectorException, 'receive failed: %s:%s' % (err, errmsg)
-
- def send(self, msg):
- try:
- return self.socket.send(msg)
- except socket.error, (err, errmsg):
- if err == errno.EAGAIN:
- raise ConnectorTryAgainException
- if err in (errno.ECONNRESET, errno.ETIMEDOUT, errno.EPIPE):
- raise ConnectorConnectionClosedException
- raise ConnectorException, 'send failed: %s:%s' % (err, errmsg)
-
- def close(self):
- self.is_closed = True
- return self.socket.close()
-
- def __repr__(self):
- if self.is_closed:
- fileno = '?'
- else:
- fileno = self.socket_fd
- result = '<%s at 0x%x fileno %s %s>' % (self.__class__.__name__,
- id(self), fileno, self.socket.getsockname())
- if self.is_closed is None:
- result += 'never opened'
- else:
- if self.is_closed:
- result += 'closed '
- else:
- result += 'opened '
- if self.is_listening:
- result += 'listening'
- else:
- if self.accepted_from is None:
- result += 'to'
- else:
- result += 'from'
- result += ' %s' % (self.remote_addr, )
- return result + '>'
-
-registerConnectorHandler(SocketConnector)
-
-class ConnectorException(Exception):
- pass
-
-class ConnectorTryAgainException(ConnectorException):
- pass
-
-class ConnectorInProgressException(ConnectorException):
- pass
-
-class ConnectorConnectionClosedException(ConnectorException):
- pass
-
-class ConnectorConnectionRefusedException(ConnectorException):
- pass
-
Removed: trunk/neo/dispatcher.py
==============================================================================
--- trunk/neo/dispatcher.py [iso-8859-1] (original)
+++ trunk/neo/dispatcher.py (removed)
@@ -1,143 +0,0 @@
-#
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-from neo.locking import Lock
-from neo.profiling import profiler_decorator
-EMPTY = {}
-NOBODY = []
-
-class ForgottenPacket(object):
- """
- Instances of this class will be pushed to queue when an expected answer
- is being forgotten. Its purpose is similar to pushing "None" when
- connection is closed, but the meaning is different.
- """
- def __init__(self, msg_id):
- self.msg_id = msg_id
-
- def getId(self):
- return self.msg_id
-
-def giant_lock(func):
- def wrapped(self, *args, **kw):
- self.lock_acquire()
- try:
- return func(self, *args, **kw)
- finally:
- self.lock_release()
- return wrapped
-
-class Dispatcher:
- """Register a packet, connection pair as expecting a response packet."""
-
- def __init__(self, poll_thread=None):
- self.message_table = {}
- self.queue_dict = {}
- lock = Lock()
- self.lock_acquire = lock.acquire
- self.lock_release = lock.release
- self.poll_thread = poll_thread
-
- @giant_lock
- @profiler_decorator
- def dispatch(self, conn, msg_id, packet):
- """
- Retrieve register-time provided queue, and put conn and packet in it.
- """
- queue = self.message_table.get(id(conn), EMPTY).pop(msg_id, None)
- if queue is None:
- return False
- elif queue is NOBODY:
- return True
- self._decrefQueue(queue)
- queue.put((conn, packet))
- return True
-
- def _decrefQueue(self, queue):
- queue_id = id(queue)
- queue_dict = self.queue_dict
- if queue_dict[queue_id] == 1:
- queue_dict.pop(queue_id)
- else:
- queue_dict[queue_id] -= 1
-
- def _increfQueue(self, queue):
- queue_id = id(queue)
- queue_dict = self.queue_dict
- try:
- queue_dict[queue_id] += 1
- except KeyError:
- queue_dict[queue_id] = 1
-
- def needPollThread(self):
- self.poll_thread.start()
-
- @giant_lock
- @profiler_decorator
- def register(self, conn, msg_id, queue):
- """Register an expectation for a reply."""
- if self.poll_thread is not None:
- self.needPollThread()
- self.message_table.setdefault(id(conn), {})[msg_id] = queue
- self._increfQueue(queue)
-
- @profiler_decorator
- def unregister(self, conn):
- """ Unregister a connection and put fake packet in queues to unlock
- threads excepting responses from that connection """
- self.lock_acquire()
- try:
- message_table = self.message_table.pop(id(conn), EMPTY)
- finally:
- self.lock_release()
- notified_set = set()
- _decrefQueue = self._decrefQueue
- for queue in message_table.itervalues():
- if queue is NOBODY:
- continue
- queue_id = id(queue)
- if queue_id not in notified_set:
- queue.put((conn, None))
- notified_set.add(queue_id)
- _decrefQueue(queue)
-
- @giant_lock
- @profiler_decorator
- def forget(self, conn, msg_id):
- """ Forget about a specific message for a specific connection.
- Actually makes it "expected by nobody", so we know we can ignore it,
- and not detect it as an error. """
- message_table = self.message_table[id(conn)]
- queue = message_table[msg_id]
- if queue is NOBODY:
- raise KeyError, 'Already expected by NOBODY: %r, %r' % (
- conn, msg_id)
- queue.put((conn, ForgottenPacket(msg_id)))
- self.queue_dict[id(queue)] -= 1
- message_table[msg_id] = NOBODY
- return queue
-
- @profiler_decorator
- def registered(self, conn):
- """Check if a connection is registered into message table."""
- return len(self.message_table.get(id(conn), EMPTY)) != 0
-
- @giant_lock
- @profiler_decorator
- def pending(self, queue):
- return not queue.empty() or self.queue_dict.get(id(queue), 0) > 0
-
Removed: trunk/neo/epoll.py
==============================================================================
--- trunk/neo/epoll.py [iso-8859-1] (original)
+++ trunk/neo/epoll.py (removed)
@@ -1,136 +0,0 @@
-#
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-r"""This is an epoll(4) interface available in Linux 2.6. This requires
-ctypes <http://python.net/crew/theller/ctypes/>."""
-
-from ctypes import cdll, Union, Structure, \
- c_void_p, c_int, byref
-try:
- from ctypes import c_uint32, c_uint64
-except ImportError:
- from ctypes import c_uint, c_ulonglong
- c_uint32 = c_uint
- c_uint64 = c_ulonglong
-from os import close
-from errno import EINTR, EAGAIN
-
-libc = cdll.LoadLibrary('libc.so.6')
-epoll_create = libc.epoll_create
-epoll_wait = libc.epoll_wait
-epoll_ctl = libc.epoll_ctl
-errno = c_int.in_dll(libc, 'errno')
-
-EPOLLIN = 0x001
-EPOLLPRI = 0x002
-EPOLLOUT = 0x004
-EPOLLRDNORM = 0x040
-EPOLLRDBAND = 0x080
-EPOLLWRNORM = 0x100
-EPOLLWRBAND = 0x200
-EPOLLMSG = 0x400
-EPOLLERR = 0x008
-EPOLLHUP = 0x010
-EPOLLONESHOT = (1 << 30)
-EPOLLET = (1 << 31)
-
-EPOLL_CTL_ADD = 1
-EPOLL_CTL_DEL = 2
-EPOLL_CTL_MOD = 3
-
-class EpollData(Union):
- _fields_ = [("ptr", c_void_p),
- ("fd", c_int),
- ("u32", c_uint32),
- ("u64", c_uint64)]
-
-class EpollEvent(Structure):
- _fields_ = [("events", c_uint32),
- ("data", EpollData)]
- _pack_ = 1
-
-class Epoll(object):
- efd = -1
-
- def __init__(self):
- self.efd = epoll_create(10)
- if self.efd == -1:
- raise OSError(errno.value, 'epoll_create failed')
-
- self.maxevents = 1024 # XXX arbitrary
- epoll_event_array = EpollEvent * self.maxevents
- self.events = epoll_event_array()
-
- def poll(self, timeout=1):
- if timeout is None:
- timeout = -1
- else:
- timeout *= 1000
- timeout = int(timeout)
- while True:
- n = epoll_wait(self.efd, byref(self.events), self.maxevents,
- timeout)
- if n == -1:
- e = errno.value
- if e in (EINTR, EAGAIN):
- continue
- else:
- raise OSError(e, 'epoll_wait failed')
- else:
- readable_fd_list = []
- writable_fd_list = []
- error_fd_list = []
- for i in xrange(n):
- ev = self.events[i]
- fd = int(ev.data.fd)
- if ev.events & EPOLLIN:
- readable_fd_list.append(fd)
- if ev.events & EPOLLOUT:
- writable_fd_list.append(fd)
- if ev.events & (EPOLLERR | EPOLLHUP):
- error_fd_list.append(fd)
- return readable_fd_list, writable_fd_list, error_fd_list
-
- def register(self, fd):
- ev = EpollEvent()
- ev.data.fd = fd
- ret = epoll_ctl(self.efd, EPOLL_CTL_ADD, fd, byref(ev))
- if ret == -1:
- raise OSError(errno.value, 'epoll_ctl failed')
-
- def modify(self, fd, readable, writable):
- ev = EpollEvent()
- ev.data.fd = fd
- events = 0
- if readable:
- events |= EPOLLIN
- if writable:
- events |= EPOLLOUT
- ev.events = events
- ret = epoll_ctl(self.efd, EPOLL_CTL_MOD, fd, byref(ev))
- if ret == -1:
- raise OSError(errno.value, 'epoll_ctl failed')
-
- def unregister(self, fd):
- ev = EpollEvent()
- ret = epoll_ctl(self.efd, EPOLL_CTL_DEL, fd, byref(ev))
- if ret == -1:
- raise OSError(errno.value, 'epoll_ctl failed')
-
- def __del__(self):
- if self.efd >= 0:
- close(self.efd)
Removed: trunk/neo/event.py
==============================================================================
--- trunk/neo/event.py [iso-8859-1] (original)
+++ trunk/neo/event.py (removed)
@@ -1,204 +0,0 @@
-#
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-from time import time
-import neo
-from neo.epoll import Epoll
-from neo.profiling import profiler_decorator
-
-class EpollEventManager(object):
- """This class manages connections and events based on epoll(5)."""
-
- def __init__(self):
- self.connection_dict = {}
- self.reader_set = set([])
- self.writer_set = set([])
- self.prev_time = time()
- self.epoll = Epoll()
- self._pending_processing = []
-
- def getConnectionList(self):
- # XXX: use index
- return [x for x in self.connection_dict.values() if not x.isAborted()]
-
- def getClientList(self):
- # XXX: use index
- return [c for c in self.getConnectionList() if c.isClient()]
-
- def getServerList(self):
- # XXX: use index
- return [c for c in self.getConnectionList() if c.isServer()]
-
- def getConnectionListByUUID(self, uuid):
- """ Return the connection associated to the UUID, None if the UUID is
- None, invalid or not found"""
- # XXX: use index
- # XXX: consider remove UUID from connection and thus this method
- if uuid is None:
- return None
- result = []
- append = result.append
- for conn in self.getConnectionList():
- if conn.getUUID() == uuid:
- append(conn)
- return result
-
- def register(self, conn):
- fd = conn.getConnector().getDescriptor()
- self.connection_dict[fd] = conn
- self.epoll.register(fd)
-
- def unregister(self, conn):
- new_pending_processing = [x for x in self._pending_processing
- if x is not conn]
- # Check that we removed at most one entry from
- # self._pending_processing .
- assert len(new_pending_processing) > len(self._pending_processing) - 2
- self._pending_processing = new_pending_processing
- fd = conn.getConnector().getDescriptor()
- self.epoll.unregister(fd)
- del self.connection_dict[fd]
-
- def _getPendingConnection(self):
- if len(self._pending_processing):
- result = self._pending_processing.pop(0)
- else:
- result = None
- return result
-
- def _addPendingConnection(self, conn):
- pending_processing = self._pending_processing
- if conn not in pending_processing:
- pending_processing.append(conn)
-
- def poll(self, timeout=1):
- to_process = self._getPendingConnection()
- if to_process is None:
- # Fetch messages from polled file descriptors
- self._poll(timeout=timeout)
- # See if there is anything to process
- to_process = self._getPendingConnection()
- if to_process is not None:
- to_process.lock()
- try:
- try:
- # Process
- to_process.process()
- finally:
- # ...and requeue if there are pending messages
- if to_process.hasPendingMessages():
- self._addPendingConnection(to_process)
- finally:
- to_process.unlock()
- # Non-blocking call: as we handled a packet, we should just offer
- # poll a chance to fetch & send already-available data, but it must
- # not delay us.
- self._poll(timeout=0)
-
- def _poll(self, timeout=1):
- rlist, wlist, elist = self.epoll.poll(timeout)
- for fd in frozenset(rlist):
- conn = self.connection_dict[fd]
- conn.lock()
- try:
- conn.readable()
- finally:
- conn.unlock()
- if conn.hasPendingMessages():
- self._addPendingConnection(conn)
-
- for fd in frozenset(wlist):
- # This can fail, if a connection is closed in readable().
- try:
- conn = self.connection_dict[fd]
- except KeyError:
- pass
- else:
- conn.lock()
- try:
- conn.writable()
- finally:
- conn.unlock()
-
- for fd in frozenset(elist):
- # This can fail, if a connection is closed in previous calls to
- # readable() or writable().
- try:
- conn = self.connection_dict[fd]
- except KeyError:
- pass
- else:
- conn.lock()
- try:
- conn.readable()
- finally:
- conn.unlock()
-
- t = time()
- for conn in self.connection_dict.values():
- conn.lock()
- try:
- conn.checkTimeout(t)
- finally:
- conn.unlock()
-
- def addReader(self, conn):
- connector = conn.getConnector()
- assert connector is not None, conn.whoSetConnector()
- fd = connector.getDescriptor()
- if fd not in self.reader_set:
- self.reader_set.add(fd)
- self.epoll.modify(fd, 1, fd in self.writer_set)
-
- def removeReader(self, conn):
- connector = conn.getConnector()
- assert connector is not None, conn.whoSetConnector()
- fd = connector.getDescriptor()
- if fd in self.reader_set:
- self.reader_set.remove(fd)
- self.epoll.modify(fd, 0, fd in self.writer_set)
-
- @profiler_decorator
- def addWriter(self, conn):
- connector = conn.getConnector()
- assert connector is not None, conn.whoSetConnector()
- fd = connector.getDescriptor()
- if fd not in self.writer_set:
- self.writer_set.add(fd)
- self.epoll.modify(fd, fd in self.reader_set, 1)
-
- def removeWriter(self, conn):
- connector = conn.getConnector()
- assert connector is not None, conn.whoSetConnector()
- fd = connector.getDescriptor()
- if fd in self.writer_set:
- self.writer_set.remove(fd)
- self.epoll.modify(fd, fd in self.reader_set, 0)
-
- def log(self):
- neo.logging.info('Event Manager:')
- neo.logging.info(' Readers: %r', [x for x in self.reader_set])
- neo.logging.info(' Writers: %r', [x for x in self.writer_set])
- neo.logging.info(' Connections:')
- pending_set = set(self._pending_processing)
- for fd, conn in self.connection_dict.items():
- neo.logging.info(' %r: %r (pending=%r)', fd, conn,
- conn in pending_set)
-
-
-# Default to EpollEventManager.
-EventManager = EpollEventManager
Removed: trunk/neo/exception.py
==============================================================================
--- trunk/neo/exception.py [iso-8859-1] (original)
+++ trunk/neo/exception.py (removed)
@@ -1,31 +0,0 @@
-#
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-class NeoException(Exception):
- pass
-
-class ElectionFailure(NeoException):
- pass
-
-class PrimaryFailure(NeoException):
- pass
-
-class OperationFailure(NeoException):
- pass
-
-class DatabaseFailure(NeoException):
- pass
Removed: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py (removed)
@@ -1,525 +0,0 @@
-#
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-import neo
-from neo.protocol import NodeStates, ErrorCodes, Packets, Errors
-from neo.protocol import PacketMalformedError, UnexpectedPacketError, \
- BrokenNodeDisallowedError, NotReadyError, ProtocolError
-
-
-class EventHandler(object):
- """This class handles events."""
-
- def __init__(self, app):
- self.app = app
- self.packet_dispatch_table = self.__initPacketDispatchTable()
- self.error_dispatch_table = self.__initErrorDispatchTable()
-
- def __repr__(self):
- return self.__class__.__name__
-
- def __unexpectedPacket(self, conn, packet, message=None):
- """Handle an unexpected packet."""
- if message is None:
- message = 'unexpected packet type %s in %s' % (packet.getType(),
- self.__class__.__name__)
- else:
- message = 'unexpected packet: %s in %s' % (message,
- self.__class__.__name__)
- neo.logging.error(message)
- conn.answer(Errors.ProtocolError(message))
- conn.abort()
- self.peerBroken(conn)
-
- def dispatch(self, conn, packet):
- """This is a helper method to handle various packet types."""
- try:
- try:
- method = self.packet_dispatch_table[packet.getType()]
- except KeyError:
- raise UnexpectedPacketError('no handler found')
- args = packet.decode() or ()
- conn.setPeerId(packet.getId())
- method(conn, *args)
- except UnexpectedPacketError, e:
- self.__unexpectedPacket(conn, packet, *e.args)
- except PacketMalformedError:
- neo.logging.error('malformed packet from %r', conn)
- conn.notify(Packets.Notify('Malformed packet: %r' % (packet, )))
- conn.abort()
- self.peerBroken(conn)
- except BrokenNodeDisallowedError:
- conn.answer(Errors.Broken('go away'))
- conn.abort()
- self.connectionClosed(conn)
- except NotReadyError, message:
- if not message.args:
- message = 'Retry Later'
- message = str(message)
- conn.answer(Errors.NotReady(message))
- conn.abort()
- self.connectionClosed(conn)
- except ProtocolError, message:
- message = str(message)
- conn.answer(Errors.ProtocolError(message))
- conn.abort()
- self.connectionClosed(conn)
-
- def checkClusterName(self, name):
- # raise an exception if the fiven name mismatch the current cluster name
- if self.app.name != name:
- neo.logging.error('reject an alien cluster')
- raise ProtocolError('invalid cluster name')
-
-
- # Network level handlers
-
- def packetReceived(self, conn, packet):
- """Called when a packet is received."""
- self.dispatch(conn, packet)
-
- def connectionStarted(self, conn):
- """Called when a connection is started."""
- neo.logging.debug('connection started for %r', conn)
-
- def connectionCompleted(self, conn):
- """Called when a connection is completed."""
- neo.logging.debug('connection completed for %r', conn)
-
- def connectionFailed(self, conn):
- """Called when a connection failed."""
- neo.logging.debug('connection failed for %r', conn)
-
- def connectionAccepted(self, conn):
- """Called when a connection is accepted."""
-
- def timeoutExpired(self, conn):
- """Called when a timeout event occurs."""
- neo.logging.debug('timeout expired for %r', conn)
- self.connectionLost(conn, NodeStates.TEMPORARILY_DOWN)
-
- def connectionClosed(self, conn):
- """Called when a connection is closed by the peer."""
- neo.logging.debug('connection closed for %r', conn)
- self.connectionLost(conn, NodeStates.TEMPORARILY_DOWN)
-
- def peerBroken(self, conn):
- """Called when a peer is broken."""
- neo.logging.error('%r is broken', conn)
- self.connectionLost(conn, NodeStates.BROKEN)
-
- def connectionLost(self, conn, new_state):
- """ this is a method to override in sub-handlers when there is no need
- to make distinction from the kind event that closed the connection """
- pass
-
-
- # Packet handlers.
-
- def notify(self, conn, message):
- neo.logging.info('notification from %r: %s', conn, message)
-
- def requestIdentification(self, conn, node_type,
- uuid, address, name):
- raise UnexpectedPacketError
-
- def acceptIdentification(self, conn, node_type,
- uuid, num_partitions, num_replicas, your_uuid):
- raise UnexpectedPacketError
-
- def askPrimary(self, conn):
- raise UnexpectedPacketError
-
- def answerPrimary(self, conn, primary_uuid,
- known_master_list):
- raise UnexpectedPacketError
-
- def announcePrimary(self, con):
- raise UnexpectedPacketError
-
- def reelectPrimary(self, conn):
- raise UnexpectedPacketError
-
- def notifyNodeInformation(self, conn, node_list):
- raise UnexpectedPacketError
-
- def askLastIDs(self, conn):
- raise UnexpectedPacketError
-
- def answerLastIDs(self, conn, loid, ltid, lptid):
- raise UnexpectedPacketError
-
- def askPartitionTable(self, conn):
- raise UnexpectedPacketError
-
- def answerPartitionTable(self, conn, ptid, row_list):
- raise UnexpectedPacketError
-
- def sendPartitionTable(self, conn, ptid, row_list):
- raise UnexpectedPacketError
-
- def notifyPartitionChanges(self, conn, ptid, cell_list):
- raise UnexpectedPacketError
-
- def startOperation(self, conn):
- raise UnexpectedPacketError
-
- def stopOperation(self, conn):
- raise UnexpectedPacketError
-
- def askUnfinishedTransactions(self, conn):
- raise UnexpectedPacketError
-
- def answerUnfinishedTransactions(self, conn, tid_list):
- raise UnexpectedPacketError
-
- def askObjectPresent(self, conn, oid, tid):
- raise UnexpectedPacketError
-
- def answerObjectPresent(self, conn, oid, tid):
- raise UnexpectedPacketError
-
- def deleteTransaction(self, conn, tid, oid_list):
- raise UnexpectedPacketError
-
- def commitTransaction(self, conn, tid):
- raise UnexpectedPacketError
-
- def askBeginTransaction(self, conn, tid):
- raise UnexpectedPacketError
-
- def answerBeginTransaction(self, conn, ttid):
- raise UnexpectedPacketError
-
- def askNewOIDs(self, conn, num_oids):
- raise UnexpectedPacketError
-
- def answerNewOIDs(self, conn, num_oids):
- raise UnexpectedPacketError
-
- def askFinishTransaction(self, conn, ttid, oid_list):
- raise UnexpectedPacketError
-
- def answerTransactionFinished(self, conn, ttid, tid):
- raise UnexpectedPacketError
-
- def askLockInformation(self, conn, ttid, tid, oid_list):
- raise UnexpectedPacketError
-
- def answerInformationLocked(self, conn, ttid):
- raise UnexpectedPacketError
-
- def invalidateObjects(self, conn, tid, oid_list):
- raise UnexpectedPacketError
-
- def notifyUnlockInformation(self, conn, ttid):
- raise UnexpectedPacketError
-
- def askStoreObject(self, conn, oid, serial,
- compression, checksum, data, data_serial, ttid, unlock):
- raise UnexpectedPacketError
-
- def answerStoreObject(self, conn, conflicting, oid, serial):
- raise UnexpectedPacketError
-
- def abortTransaction(self, conn, ttid):
- raise UnexpectedPacketError
-
- def askStoreTransaction(self, conn, ttid, user, desc,
- ext, oid_list):
- raise UnexpectedPacketError
-
- def answerStoreTransaction(self, conn, ttid):
- raise UnexpectedPacketError
-
- def askObject(self, conn, oid, serial, ttid):
- raise UnexpectedPacketError
-
- def answerObject(self, conn, oid, serial_start,
- serial_end, compression, checksum, data, data_serial):
- raise UnexpectedPacketError
-
- def askTIDs(self, conn, first, last, partition):
- raise UnexpectedPacketError
-
- def answerTIDs(self, conn, tid_list):
- raise UnexpectedPacketError
-
- def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
- raise UnexpectedPacketError
-
- def answerTIDsFrom(self, conn, tid_list):
- raise UnexpectedPacketError
-
- def askTransactionInformation(self, conn, tid):
- raise UnexpectedPacketError
-
- def answerTransactionInformation(self, conn, tid,
- user, desc, ext, packed, oid_list):
- raise UnexpectedPacketError
-
- def askObjectHistory(self, conn, oid, first, last):
- raise UnexpectedPacketError
-
- def answerObjectHistory(self, conn, oid, history_list):
- raise UnexpectedPacketError
-
- def askObjectHistoryFrom(self, conn, oid, min_serial, max_serial, length,
- partition):
- raise UnexpectedPacketError
-
- def answerObjectHistoryFrom(self, conn, object_dict):
- raise UnexpectedPacketError
-
- def askPartitionList(self, conn, min_offset, max_offset, uuid):
- raise UnexpectedPacketError
-
- def answerPartitionList(self, conn, ptid, row_list):
- raise UnexpectedPacketError
-
- def askNodeList(self, conn, offset_list):
- raise UnexpectedPacketError
-
- def answerNodeList(self, conn, node_list):
- raise UnexpectedPacketError
-
- def setNodeState(self, conn, uuid, state, modify_partition_table):
- raise UnexpectedPacketError
-
- def addPendingNodes(self, conn, uuid_list):
- raise UnexpectedPacketError
-
- def askNodeInformation(self, conn):
- raise UnexpectedPacketError
-
- def answerNodeInformation(self, conn):
- raise UnexpectedPacketError
-
- def askClusterState(self, conn):
- raise UnexpectedPacketError
-
- def answerClusterState(self, conn, state):
- raise UnexpectedPacketError
-
- def setClusterState(self, conn, state):
- raise UnexpectedPacketError
-
- def notifyClusterInformation(self, conn, state):
- raise UnexpectedPacketError
-
- def notifyLastOID(self, conn, oid):
- raise UnexpectedPacketError
-
- def notifyReplicationDone(self, conn, offset):
- raise UnexpectedPacketError
-
- def askObjectUndoSerial(self, conn, ttid, ltid, undone_tid, oid_list):
- raise UnexpectedPacketError
-
- def answerObjectUndoSerial(self, conn, object_tid_dict):
- raise UnexpectedPacketError
-
- def askHasLock(self, conn, ttid, oid):
- raise UnexpectedPacketError
-
- def answerHasLock(self, conn, oid, status):
- raise UnexpectedPacketError
-
- def askBarrier(self, conn):
- conn.answer(Packets.AnswerBarrier())
-
- def answerBarrier(self, conn):
- pass
-
- def askPack(self, conn, tid):
- raise UnexpectedPacketError
-
- def answerPack(self, conn, status):
- raise UnexpectedPacketError
-
- def askCheckTIDRange(self, conn, min_tid, max_tid, length, partition):
- raise UnexpectedPacketError
-
- def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
- max_tid):
- raise UnexpectedPacketError
-
- def askCheckSerialRange(self, conn, min_oid, min_serial, max_tid, length,
- partition):
- raise UnexpectedPacketError
-
- def answerCheckSerialRange(self, conn, min_oid, min_serial, length, count,
- oid_checksum, max_oid, serial_checksum, max_serial):
- raise UnexpectedPacketError
-
- def notifyReady(self, conn):
- raise UnexpectedPacketError
-
- def askLastTransaction(self, conn):
- raise UnexpectedPacketError
-
- def answerLastTransaction(self, conn, tid):
- raise UnexpectedPacketError
-
- def askCheckCurrentSerial(self, conn, ttid, serial, oid):
- raise UnexpectedPacketError
-
- answerCheckCurrentSerial = answerStoreObject
-
- # Error packet handlers.
-
- def error(self, conn, code, message):
- try:
- method = self.error_dispatch_table[code]
- method(conn, message)
- except ValueError:
- raise UnexpectedPacketError(message)
-
- def notReady(self, conn, message):
- raise UnexpectedPacketError
-
- def oidNotFound(self, conn, message):
- raise UnexpectedPacketError
-
- def oidDoesNotExist(self, conn, message):
- raise UnexpectedPacketError
-
- def tidNotFound(self, conn, message):
- raise UnexpectedPacketError
-
- def protocolError(self, conn, message):
- # the connection should have been closed by the remote peer
- neo.logging.error('protocol error: %s' % (message,))
-
- def timeoutError(self, conn, message):
- neo.logging.error('timeout error: %s' % (message,))
-
- def brokenNodeDisallowedError(self, conn, message):
- raise RuntimeError, 'broken node disallowed error: %s' % (message,)
-
- def alreadyPendingError(self, conn, message):
- neo.logging.error('already pending error: %s' % (message, ))
-
- def ack(self, conn, message):
- neo.logging.debug("no error message : %s" % (message))
-
-
- # Fetch tables initialization
-
- def __initPacketDispatchTable(self):
- d = {}
-
- d[Packets.Error] = self.error
- d[Packets.Notify] = self.notify
- d[Packets.RequestIdentification] = self.requestIdentification
- d[Packets.AcceptIdentification] = self.acceptIdentification
- d[Packets.AskPrimary] = self.askPrimary
- d[Packets.AnswerPrimary] = self.answerPrimary
- d[Packets.AnnouncePrimary] = self.announcePrimary
- d[Packets.ReelectPrimary] = self.reelectPrimary
- d[Packets.NotifyNodeInformation] = self.notifyNodeInformation
- d[Packets.AskLastIDs] = self.askLastIDs
- d[Packets.AnswerLastIDs] = self.answerLastIDs
- d[Packets.AskPartitionTable] = self.askPartitionTable
- d[Packets.AnswerPartitionTable] = self.answerPartitionTable
- d[Packets.SendPartitionTable] = self.sendPartitionTable
- d[Packets.NotifyPartitionChanges] = self.notifyPartitionChanges
- d[Packets.StartOperation] = self.startOperation
- d[Packets.StopOperation] = self.stopOperation
- d[Packets.AskUnfinishedTransactions] = self.askUnfinishedTransactions
- d[Packets.AnswerUnfinishedTransactions] = \
- self.answerUnfinishedTransactions
- d[Packets.AskObjectPresent] = self.askObjectPresent
- d[Packets.AnswerObjectPresent] = self.answerObjectPresent
- d[Packets.DeleteTransaction] = self.deleteTransaction
- d[Packets.CommitTransaction] = self.commitTransaction
- d[Packets.AskBeginTransaction] = self.askBeginTransaction
- d[Packets.AnswerBeginTransaction] = self.answerBeginTransaction
- d[Packets.AskFinishTransaction] = self.askFinishTransaction
- d[Packets.AnswerTransactionFinished] = self.answerTransactionFinished
- d[Packets.AskLockInformation] = self.askLockInformation
- d[Packets.AnswerInformationLocked] = self.answerInformationLocked
- d[Packets.InvalidateObjects] = self.invalidateObjects
- d[Packets.NotifyUnlockInformation] = self.notifyUnlockInformation
- d[Packets.AskNewOIDs] = self.askNewOIDs
- d[Packets.AnswerNewOIDs] = self.answerNewOIDs
- d[Packets.AskStoreObject] = self.askStoreObject
- d[Packets.AnswerStoreObject] = self.answerStoreObject
- d[Packets.AbortTransaction] = self.abortTransaction
- d[Packets.AskStoreTransaction] = self.askStoreTransaction
- d[Packets.AnswerStoreTransaction] = self.answerStoreTransaction
- d[Packets.AskObject] = self.askObject
- d[Packets.AnswerObject] = self.answerObject
- d[Packets.AskTIDs] = self.askTIDs
- d[Packets.AnswerTIDs] = self.answerTIDs
- d[Packets.AskTIDsFrom] = self.askTIDsFrom
- d[Packets.AnswerTIDsFrom] = self.answerTIDsFrom
- d[Packets.AskTransactionInformation] = self.askTransactionInformation
- d[Packets.AnswerTransactionInformation] = \
- self.answerTransactionInformation
- d[Packets.AskObjectHistory] = self.askObjectHistory
- d[Packets.AnswerObjectHistory] = self.answerObjectHistory
- d[Packets.AskObjectHistoryFrom] = self.askObjectHistoryFrom
- d[Packets.AnswerObjectHistoryFrom] = self.answerObjectHistoryFrom
- d[Packets.AskPartitionList] = self.askPartitionList
- d[Packets.AnswerPartitionList] = self.answerPartitionList
- d[Packets.AskNodeList] = self.askNodeList
- d[Packets.AnswerNodeList] = self.answerNodeList
- d[Packets.SetNodeState] = self.setNodeState
- d[Packets.SetClusterState] = self.setClusterState
- d[Packets.AddPendingNodes] = self.addPendingNodes
- d[Packets.AskNodeInformation] = self.askNodeInformation
- d[Packets.AnswerNodeInformation] = self.answerNodeInformation
- d[Packets.AskClusterState] = self.askClusterState
- d[Packets.AnswerClusterState] = self.answerClusterState
- d[Packets.NotifyClusterInformation] = self.notifyClusterInformation
- d[Packets.NotifyLastOID] = self.notifyLastOID
- d[Packets.NotifyReplicationDone] = self.notifyReplicationDone
- d[Packets.AskObjectUndoSerial] = self.askObjectUndoSerial
- d[Packets.AnswerObjectUndoSerial] = self.answerObjectUndoSerial
- d[Packets.AskHasLock] = self.askHasLock
- d[Packets.AnswerHasLock] = self.answerHasLock
- d[Packets.AskBarrier] = self.askBarrier
- d[Packets.AnswerBarrier] = self.answerBarrier
- d[Packets.AskPack] = self.askPack
- d[Packets.AnswerPack] = self.answerPack
- d[Packets.AskCheckTIDRange] = self.askCheckTIDRange
- d[Packets.AnswerCheckTIDRange] = self.answerCheckTIDRange
- d[Packets.AskCheckSerialRange] = self.askCheckSerialRange
- d[Packets.AnswerCheckSerialRange] = self.answerCheckSerialRange
- d[Packets.NotifyReady] = self.notifyReady
- d[Packets.AskLastTransaction] = self.askLastTransaction
- d[Packets.AnswerLastTransaction] = self.answerLastTransaction
- d[Packets.AskCheckCurrentSerial] = self.askCheckCurrentSerial
- d[Packets.AnswerCheckCurrentSerial] = self.answerCheckCurrentSerial
-
- return d
-
- def __initErrorDispatchTable(self):
- d = {}
-
- d[ErrorCodes.ACK] = self.ack
- d[ErrorCodes.NOT_READY] = self.notReady
- d[ErrorCodes.OID_NOT_FOUND] = self.oidNotFound
- d[ErrorCodes.OID_DOES_NOT_EXIST] = self.oidDoesNotExist
- d[ErrorCodes.TID_NOT_FOUND] = self.tidNotFound
- d[ErrorCodes.PROTOCOL_ERROR] = self.protocolError
- d[ErrorCodes.BROKEN_NODE] = self.brokenNodeDisallowedError
- d[ErrorCodes.ALREADY_PENDING] = self.alreadyPendingError
-
- return d
-
Copied: trunk/neo/lib/attributeTracker.py (from r2614, trunk/neo/attributeTracker.py)
==============================================================================
--- trunk/neo/attributeTracker.py [iso-8859-1] (original)
+++ trunk/neo/lib/attributeTracker.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -17,7 +17,7 @@
ATTRIBUTE_TRACKER_ENABLED = False
-from neo.locking import LockUser
+from neo.lib.locking import LockUser
"""
Usage example:
Copied: trunk/neo/lib/bootstrap.py (from r2614, trunk/neo/bootstrap.py)
==============================================================================
--- trunk/neo/bootstrap.py [iso-8859-1] (original)
+++ trunk/neo/lib/bootstrap.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -18,10 +18,10 @@
import neo
from time import sleep
-from neo.handler import EventHandler
-from neo.protocol import Packets
-from neo.util import dump
-from neo.connection import ClientConnection
+from neo.lib.handler import EventHandler
+from neo.lib.protocol import Packets
+from neo.lib.util import dump
+from neo.lib.connection import ClientConnection
NO_SERVER = ('0.0.0.0', 0)
@@ -106,7 +106,7 @@ class BootstrapManager(EventHandler):
conn.close()
return
- neo.logging.info('connected to a primary master node')
+ neo.lib.logging.info('connected to a primary master node')
conn.ask(Packets.RequestIdentification(self.node_type,
self.uuid, self.server, self.name))
@@ -120,7 +120,7 @@ class BootstrapManager(EventHandler):
if self.uuid != your_uuid:
# got an uuid from the primary master
self.uuid = your_uuid
- neo.logging.info('Got a new UUID : %s' % dump(self.uuid))
+ neo.lib.logging.info('Got a new UUID : %s' % dump(self.uuid))
conn.setUUID(uuid)
def getPrimaryConnection(self, connector_handler):
@@ -128,7 +128,7 @@ class BootstrapManager(EventHandler):
Primary lookup/connection process.
Returns when the connection is made.
"""
- neo.logging.info('connecting to a primary master node')
+ neo.lib.logging.info('connecting to a primary master node')
em, nm = self.app.em, self.app.nm
index = 0
self.current = nm.getMasterList()[0]
Copied: trunk/neo/lib/config.py (from r2614, trunk/neo/config.py)
==============================================================================
--- trunk/neo/config.py [iso-8859-1] (original)
+++ trunk/neo/lib/config.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from ConfigParser import SafeConfigParser
-from neo import util
+from neo.lib import util
class ConfigurationManager(object):
Copied: trunk/neo/lib/connection.py (from r2614, trunk/neo/connection.py)
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/lib/connection.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -17,19 +17,19 @@
from time import time
-import neo
-from neo.locking import RLock
+import neo.lib
+from neo.lib.locking import RLock
-from neo.protocol import PacketMalformedError, Packets, ParserState
-from neo.connector import ConnectorException, ConnectorTryAgainException, \
+from neo.lib.protocol import PacketMalformedError, Packets, ParserState
+from neo.lib.connector import ConnectorException, ConnectorTryAgainException, \
ConnectorInProgressException, ConnectorConnectionRefusedException, \
ConnectorConnectionClosedException
-from neo.util import dump
-from neo.logger import PACKET_LOGGER
+from neo.lib.util import dump
+from neo.lib.logger import PACKET_LOGGER
-from neo import attributeTracker
-from neo.util import ReadBuffer
-from neo.profiling import profiler_decorator
+from neo.lib import attributeTracker
+from neo.lib.util import ReadBuffer
+from neo.lib.profiling import profiler_decorator
PING_DELAY = 6
PING_TIMEOUT = 5
@@ -62,7 +62,7 @@ def lockCheckWrapper(func):
def wrapper(self, *args, **kw):
if not self._lock._is_owned():
import traceback
- neo.logging.warning('%s called on %s instance without being ' \
+ neo.lib.logging.warning('%s called on %s instance without being ' \
'locked. Stack:\n%s', func.func_code.co_name,
self.__class__.__name__, ''.join(traceback.format_stack()))
# Call anyway
@@ -163,7 +163,7 @@ class HandlerSwitcher(object):
assert len(self._pending) == 1 or self._pending[0][0]
PACKET_LOGGER.dispatch(connection, packet, 'from')
if connection.isClosed() and packet.ignoreOnClosedConnection():
- neo.logging.debug('Ignoring packet %r on closed connection %r',
+ neo.lib.logging.debug('Ignoring packet %r on closed connection %r',
packet, connection)
return
msg_id = packet.getId()
@@ -177,7 +177,8 @@ class HandlerSwitcher(object):
if klass and isinstance(packet, klass) or packet.isError():
handler.packetReceived(connection, packet)
else:
- neo.logging.error('Unexpected answer %r in %r', packet, connection)
+ neo.lib.logging.error(
+ 'Unexpected answer %r in %r', packet, connection)
notification = Packets.Notify('Unexpected answer: %r' % packet)
try:
connection.notify(notification)
@@ -188,7 +189,8 @@ class HandlerSwitcher(object):
# apply a pending handler if no more answers are pending
while len(self._pending) > 1 and not self._pending[0][0]:
del self._pending[0]
- neo.logging.debug('Apply handler %r on %r', self._pending[0][1],
+ neo.lib.logging.debug(
+ 'Apply handler %r on %r', self._pending[0][1],
connection)
if timeout == self._next_timeout:
self._updateNextTimeout()
@@ -286,12 +288,13 @@ class BaseConnection(object):
if handlers.isPending():
msg_id = handlers.checkTimeout(self, t)
if msg_id is not None:
- neo.logging.info('timeout for #0x%08x with %r', msg_id, self)
+ neo.lib.logging.info(
+ 'timeout for #0x%08x with %r', msg_id, self)
self.close()
self.getHandler().timeoutExpired(self)
elif self._timeout.hardExpired(t):
# critical time reach or pong not received, abort
- neo.logging.info('timeout with %r', self)
+ neo.lib.logging.info('timeout with %r', self)
self.notify(Packets.Notify('Timeout'))
self.abort()
self.getHandler().timeoutExpired(self)
@@ -345,9 +348,9 @@ class BaseConnection(object):
def setHandler(self, handler):
if self._handlers.setHandler(handler):
- neo.logging.debug('Set handler %r on %r', handler, self)
+ neo.lib.logging.debug('Set handler %r on %r', handler, self)
else:
- neo.logging.debug('Delay handler %r on %r', handler, self)
+ neo.lib.logging.debug('Delay handler %r on %r', handler, self)
def getEventManager(self):
return self.em
@@ -386,7 +389,7 @@ class ListeningConnection(BaseConnection
"""A listen connection."""
def __init__(self, event_manager, handler, addr, connector, **kw):
- neo.logging.debug('listening to %s:%d', *addr)
+ neo.lib.logging.debug('listening to %s:%d', *addr)
BaseConnection.__init__(self, event_manager, handler,
addr=addr, connector=connector)
self.connector.makeListeningConnection(addr)
@@ -395,7 +398,7 @@ class ListeningConnection(BaseConnection
def readable(self):
try:
new_s, addr = self.connector.getNewConnection()
- neo.logging.debug('accepted a connection from %s:%d', *addr)
+ neo.lib.logging.debug('accepted a connection from %s:%d', *addr)
handler = self.getHandler()
new_conn = ServerConnection(self.getEventManager(), handler,
connector=new_s, addr=addr)
@@ -458,7 +461,7 @@ class Connection(BaseConnection):
return next_id
def close(self):
- neo.logging.debug('closing a connector for %r', self)
+ neo.lib.logging.debug('closing a connector for %r', self)
BaseConnection.close(self)
if self._on_close is not None:
self._on_close()
@@ -469,7 +472,7 @@ class Connection(BaseConnection):
def abort(self):
"""Abort dealing with this connection."""
- neo.logging.debug('aborting a connector for %r', self)
+ neo.lib.logging.debug('aborting a connector for %r', self)
self.aborted = True
def writable(self):
@@ -556,17 +559,19 @@ class Connection(BaseConnection):
except ConnectorConnectionClosedException:
# connection resetted by peer, according to the man, this error
# should not occurs but it seems it's false
- neo.logging.debug('Connection reset by peer: %r', self.connector)
+ neo.lib.logging.debug(
+ 'Connection reset by peer: %r', self.connector)
self._closure()
except:
- neo.logging.debug('Unknown connection error: %r', self.connector)
+ neo.lib.logging.debug(
+ 'Unknown connection error: %r', self.connector)
self._closure()
# unhandled connector exception
raise
else:
if not data:
- neo.logging.debug('Connection %r closed in recv',
- self.connector)
+ neo.lib.logging.debug(
+ 'Connection %r closed in recv', self.connector)
self._closure()
return
self.read_buf.append(data)
@@ -583,16 +588,18 @@ class Connection(BaseConnection):
pass
except ConnectorConnectionClosedException:
# connection resetted by peer
- neo.logging.debug('Connection reset by peer: %r', self.connector)
+ neo.lib.logging.debug(
+ 'Connection reset by peer: %r', self.connector)
self._closure()
except:
- neo.logging.debug('Unknown connection error: %r', self.connector)
+ neo.lib.logging.debug(
+ 'Unknown connection error: %r', self.connector)
# unhandled connector exception
self._closure()
raise
else:
if not n:
- neo.logging.debug('Connection %r closed in send',
+ neo.lib.logging.debug('Connection %r closed in send',
self.connector)
self._closure()
return
Copied: trunk/neo/lib/dispatcher.py (from r2614, trunk/neo/dispatcher.py)
==============================================================================
--- trunk/neo/dispatcher.py [iso-8859-1] (original)
+++ trunk/neo/lib/dispatcher.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -15,8 +15,8 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-from neo.locking import Lock
-from neo.profiling import profiler_decorator
+from neo.lib.locking import Lock
+from neo.lib.profiling import profiler_decorator
EMPTY = {}
NOBODY = []
Copied: trunk/neo/lib/event.py (from r2614, trunk/neo/event.py)
==============================================================================
--- trunk/neo/event.py [iso-8859-1] (original)
+++ trunk/neo/lib/event.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -16,9 +16,9 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from time import time
-import neo
-from neo.epoll import Epoll
-from neo.profiling import profiler_decorator
+import neo.lib
+from neo.lib.epoll import Epoll
+from neo.lib.profiling import profiler_decorator
class EpollEventManager(object):
"""This class manages connections and events based on epoll(5)."""
@@ -190,13 +190,13 @@ class EpollEventManager(object):
self.epoll.modify(fd, fd in self.reader_set, 0)
def log(self):
- neo.logging.info('Event Manager:')
- neo.logging.info(' Readers: %r', [x for x in self.reader_set])
- neo.logging.info(' Writers: %r', [x for x in self.writer_set])
- neo.logging.info(' Connections:')
+ neo.lib.logging.info('Event Manager:')
+ neo.lib.logging.info(' Readers: %r', [x for x in self.reader_set])
+ neo.lib.logging.info(' Writers: %r', [x for x in self.writer_set])
+ neo.lib.logging.info(' Connections:')
pending_set = set(self._pending_processing)
for fd, conn in self.connection_dict.items():
- neo.logging.info(' %r: %r (pending=%r)', fd, conn,
+ neo.lib.logging.info(' %r: %r (pending=%r)', fd, conn,
conn in pending_set)
Copied: trunk/neo/lib/handler.py (from r2614, trunk/neo/handler.py)
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/lib/handler.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -15,9 +15,9 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-import neo
-from neo.protocol import NodeStates, ErrorCodes, Packets, Errors
-from neo.protocol import PacketMalformedError, UnexpectedPacketError, \
+import neo.lib
+from neo.lib.protocol import NodeStates, ErrorCodes, Packets, Errors
+from neo.lib.protocol import PacketMalformedError, UnexpectedPacketError, \
BrokenNodeDisallowedError, NotReadyError, ProtocolError
@@ -40,7 +40,7 @@ class EventHandler(object):
else:
message = 'unexpected packet: %s in %s' % (message,
self.__class__.__name__)
- neo.logging.error(message)
+ neo.lib.logging.error(message)
conn.answer(Errors.ProtocolError(message))
conn.abort()
self.peerBroken(conn)
@@ -58,7 +58,7 @@ class EventHandler(object):
except UnexpectedPacketError, e:
self.__unexpectedPacket(conn, packet, *e.args)
except PacketMalformedError:
- neo.logging.error('malformed packet from %r', conn)
+ neo.lib.logging.error('malformed packet from %r', conn)
conn.notify(Packets.Notify('Malformed packet: %r' % (packet, )))
conn.abort()
self.peerBroken(conn)
@@ -82,7 +82,7 @@ class EventHandler(object):
def checkClusterName(self, name):
# raise an exception if the fiven name mismatch the current cluster name
if self.app.name != name:
- neo.logging.error('reject an alien cluster')
+ neo.lib.logging.error('reject an alien cluster')
raise ProtocolError('invalid cluster name')
@@ -94,32 +94,32 @@ class EventHandler(object):
def connectionStarted(self, conn):
"""Called when a connection is started."""
- neo.logging.debug('connection started for %r', conn)
+ neo.lib.logging.debug('connection started for %r', conn)
def connectionCompleted(self, conn):
"""Called when a connection is completed."""
- neo.logging.debug('connection completed for %r', conn)
+ neo.lib.logging.debug('connection completed for %r', conn)
def connectionFailed(self, conn):
"""Called when a connection failed."""
- neo.logging.debug('connection failed for %r', conn)
+ neo.lib.logging.debug('connection failed for %r', conn)
def connectionAccepted(self, conn):
"""Called when a connection is accepted."""
def timeoutExpired(self, conn):
"""Called when a timeout event occurs."""
- neo.logging.debug('timeout expired for %r', conn)
+ neo.lib.logging.debug('timeout expired for %r', conn)
self.connectionLost(conn, NodeStates.TEMPORARILY_DOWN)
def connectionClosed(self, conn):
"""Called when a connection is closed by the peer."""
- neo.logging.debug('connection closed for %r', conn)
+ neo.lib.logging.debug('connection closed for %r', conn)
self.connectionLost(conn, NodeStates.TEMPORARILY_DOWN)
def peerBroken(self, conn):
"""Called when a peer is broken."""
- neo.logging.error('%r is broken', conn)
+ neo.lib.logging.error('%r is broken', conn)
self.connectionLost(conn, NodeStates.BROKEN)
def connectionLost(self, conn, new_state):
@@ -131,7 +131,7 @@ class EventHandler(object):
# Packet handlers.
def notify(self, conn, message):
- neo.logging.info('notification from %r: %s', conn, message)
+ neo.lib.logging.info('notification from %r: %s', conn, message)
def requestIdentification(self, conn, node_type,
uuid, address, name):
@@ -403,19 +403,19 @@ class EventHandler(object):
def protocolError(self, conn, message):
# the connection should have been closed by the remote peer
- neo.logging.error('protocol error: %s' % (message,))
+ neo.lib.logging.error('protocol error: %s' % (message,))
def timeoutError(self, conn, message):
- neo.logging.error('timeout error: %s' % (message,))
+ neo.lib.logging.error('timeout error: %s' % (message,))
def brokenNodeDisallowedError(self, conn, message):
raise RuntimeError, 'broken node disallowed error: %s' % (message,)
def alreadyPendingError(self, conn, message):
- neo.logging.error('already pending error: %s' % (message, ))
+ neo.lib.logging.error('already pending error: %s' % (message, ))
def ack(self, conn, message):
- neo.logging.debug("no error message : %s" % (message))
+ neo.lib.logging.debug("no error message : %s" % (message))
# Fetch tables initialization
Copied: trunk/neo/lib/logger.py (from r2614, trunk/neo/logger.py)
==============================================================================
--- trunk/neo/logger.py [iso-8859-1] (original)
+++ trunk/neo/lib/logger.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -16,10 +16,10 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import neo
-from neo.protocol import PacketMalformedError
-from neo.util import dump
-from neo.handler import EventHandler
-from neo.profiling import profiler_decorator
+from neo.lib.protocol import PacketMalformedError
+from neo.lib.util import dump
+from neo.lib.handler import EventHandler
+from neo.lib.profiling import profiler_decorator
LOGGER_ENABLED = False
@@ -37,7 +37,7 @@ class PacketLogger(object):
klass = packet.getType()
uuid = dump(conn.getUUID())
ip, port = conn.getAddress()
- neo.logging.debug('#0x%08x %-30s %s %s (%s:%d)', packet.getId(),
+ neo.lib.logging.debug('#0x%08x %-30s %s %s (%s:%d)', packet.getId(),
packet.__class__.__name__, direction, uuid, ip, port)
# look for custom packet logger
logger = self.packet_dispatch_table.get(klass, None)
@@ -48,11 +48,11 @@ class PacketLogger(object):
try:
args = packet.decode() or ()
except PacketMalformedError:
- neo.logging.warning("Can't decode packet for logging")
+ neo.lib.logging.warning("Can't decode packet for logging")
return
log_message = logger(conn, *args)
if log_message is not None:
- neo.logging.debug('#0x%08x %s', packet.getId(), log_message)
+ neo.lib.logging.debug('#0x%08x %s', packet.getId(), log_message)
def error(self, conn, code, message):
return "%s (%s)" % (code, message)
@@ -64,7 +64,7 @@ class PacketLogger(object):
else:
address = '?'
node = (dump(uuid), node_type, address, state)
- neo.logging.debug(' ! %s | %8s | %22s | %s' % node)
+ neo.lib.logging.debug(' ! %s | %8s | %22s | %s' % node)
PACKET_LOGGER = PacketLogger()
if not LOGGER_ENABLED:
Copied: trunk/neo/lib/node.py (from r2614, trunk/neo/node.py)
==============================================================================
--- trunk/neo/node.py [iso-8859-1] (original)
+++ trunk/neo/lib/node.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -17,11 +17,11 @@
from time import time
-import neo
-from neo.util import dump
-from neo.protocol import NodeTypes, NodeStates
+import neo.lib
+from neo.lib.util import dump
+from neo.lib.protocol import NodeTypes, NodeStates
-from neo import attributeTracker
+from neo.lib import attributeTracker
class Node(object):
"""This class represents a node."""
@@ -263,7 +263,7 @@ class NodeManager(object):
def add(self, node):
if node in self._node_set:
- neo.logging.warning('adding a known node %r, ignoring', node)
+ neo.lib.logging.warning('adding a known node %r, ignoring', node)
return
self._node_set.add(node)
self._updateAddress(node, None)
@@ -274,7 +274,7 @@ class NodeManager(object):
def remove(self, node):
if node not in self._node_set:
- neo.logging.warning('removing unknown node %r, ignoring', node)
+ neo.lib.logging.warning('removing unknown node %r, ignoring', node)
return
self._node_set.remove(node)
self.__drop(self._address_dict, node.getAddress())
@@ -446,12 +446,12 @@ class NodeManager(object):
log_args = (node_type, dump(uuid), addr, state)
if node is None:
if state == NodeStates.DOWN:
- neo.logging.debug('NOT creating node %s %s %s %s',
+ neo.lib.logging.debug('NOT creating node %s %s %s %s',
*log_args)
else:
node = self._createNode(klass, address=addr, uuid=uuid,
state=state)
- neo.logging.debug('creating node %r', node)
+ neo.lib.logging.debug('creating node %r', node)
else:
assert isinstance(node, klass), 'node %r is not ' \
'of expected type: %r' % (node, klass)
@@ -460,14 +460,15 @@ class NodeManager(object):
'Discrepancy between node_by_uuid (%r) and ' \
'node_by_addr (%r)' % (node_by_uuid, node_by_addr)
if state == NodeStates.DOWN:
- neo.logging.debug('droping node %r (%r), found with %s ' \
+ neo.lib.logging.debug(
+ 'droping node %r (%r), found with %s ' \
'%s %s %s', node, node.isConnected(), *log_args)
if node.isConnected():
# cut this connection, node removed by handler
node.getConnection().close()
self.remove(node)
else:
- neo.logging.debug('updating node %r to %s %s %s %s',
+ neo.lib.logging.debug('updating node %r to %s %s %s %s',
node, *log_args)
node.setUUID(uuid)
node.setAddress(addr)
@@ -475,12 +476,12 @@ class NodeManager(object):
self.log()
def log(self):
- neo.logging.info('Node manager : %d nodes' % len(self._node_set))
+ neo.lib.logging.info('Node manager : %d nodes' % len(self._node_set))
for node in sorted(list(self._node_set)):
uuid = dump(node.getUUID()) or '-' * 32
address = node.getAddress() or ''
if address:
address = '%s:%d' % address
- neo.logging.info(' * %32s | %8s | %22s | %s' % (
+ neo.lib.logging.info(' * %32s | %8s | %22s | %s' % (
uuid, node.getType(), address, node.getState()))
Copied: trunk/neo/lib/protocol.py (from r2614, trunk/neo/protocol.py)
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/lib/protocol.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -17,10 +17,10 @@
from struct import pack, unpack, error, calcsize
from socket import inet_ntoa, inet_aton
-from neo.profiling import profiler_decorator
+from neo.lib.profiling import profiler_decorator
from cStringIO import StringIO
-from neo.util import Enum
+from neo.lib.util import Enum
# The protocol version (major, minor).
PROTOCOL_VERSION = (4, 1)
Copied: trunk/neo/lib/pt.py (from r2614, trunk/neo/pt.py)
==============================================================================
--- trunk/neo/pt.py [iso-8859-1] (original)
+++ trunk/neo/lib/pt.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -17,10 +17,10 @@
import neo
-from neo import protocol
-from neo.protocol import CellStates
-from neo.util import dump, u64
-from neo.locking import RLock
+from neo.lib import protocol
+from neo.lib.protocol import CellStates
+from neo.lib.util import dump, u64
+from neo.lib.locking import RLock
class PartitionTableException(Exception):
"""
@@ -218,7 +218,7 @@ class PartitionTable(object):
# the node must be known by the node manager
assert node is not None
self.setCell(offset, node, state)
- neo.logging.debug('partition table loaded')
+ neo.lib.logging.debug('partition table loaded')
self.log()
def update(self, ptid, cell_list, nm):
@@ -228,14 +228,14 @@ class PartitionTable(object):
is not known, it is created in the node manager and set as unavailable
"""
if ptid <= self._id:
- neo.logging.warning('ignoring older partition changes')
+ neo.lib.logging.warning('ignoring older partition changes')
return
self._id = ptid
for offset, uuid, state in cell_list:
node = nm.getByUUID(uuid)
assert node is not None, 'No node found for uuid %r' % (dump(uuid), )
self.setCell(offset, node, state)
- neo.logging.debug('partition table updated')
+ neo.lib.logging.debug('partition table updated')
self.log()
def filled(self):
@@ -243,7 +243,7 @@ class PartitionTable(object):
def log(self):
for line in self._format():
- neo.logging.debug(line)
+ neo.lib.logging.debug(line)
def format(self):
return '\n'.join(self._format())
Removed: trunk/neo/live_debug.py
==============================================================================
--- trunk/neo/live_debug.py [iso-8859-1] (original)
+++ trunk/neo/live_debug.py (removed)
@@ -1,78 +0,0 @@
-#
-# Copyright (C) 2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-import traceback
-import signal
-import ctypes
-import imp
-import neo
-import pdb
-
-# WARNING: This module should only be used for live application debugging.
-# It - by purpose - allows code injection in a running neo process.
-# You don't want to enable it in a production environment. Really.
-ENABLED = False
-
-# How to include in python code:
-# from neo.live_debug import register
-# register()
-#
-# How to trigger it:
-# Kill python process with:
-# SIGUSR1:
-# Loads (or reloads) neo.debug module.
-# The content is up to you (it's only imported).
-# SIGUSR2:
-# Triggers a pdb prompt on process' controlling TTY.
-
-libc = ctypes.cdll.LoadLibrary('libc.so.6')
-errno = ctypes.c_int.in_dll(libc, 'errno')
-
-def decorate(func):
- def decorator(sig, frame):
- # Save errno value, to restore it after sig handler returns
- old_errno = errno.value
- try:
- func(sig, frame)
- except:
- # Prevent exception from exiting signal handler, so mistakes in
- # "debug" module don't kill process.
- traceback.print_exc()
- errno.value = old_errno
- return decorator
-
- at decorate
-def debugHandler(sig, frame):
- file, filename, (suffix, mode, type) = imp.find_module('debug',
- neo.__path__)
- imp.load_module('neo.debug', file, filename, (suffix, mode, type))
-
- at decorate
-def pdbHandler(sig, frame):
- pdb.set_trace()
-
-def register(on_log=None):
- if ENABLED:
- signal.signal(signal.SIGUSR1, debugHandler)
- signal.signal(signal.SIGUSR2, pdbHandler)
- if on_log is not None:
- # use 'kill -RTMIN <pid>
- @decorate
- def on_log_signal(signum, signal):
- on_log()
- signal.signal(signal.SIGRTMIN, on_log_signal)
-
Removed: trunk/neo/locking.py
==============================================================================
--- trunk/neo/locking.py [iso-8859-1] (original)
+++ trunk/neo/locking.py (removed)
@@ -1,177 +0,0 @@
-from threading import Lock as threading_Lock
-from threading import RLock as threading_RLock
-from threading import currentThread
-from Queue import Queue as Queue_Queue
-from Queue import Empty
-
-"""
- Verbose locking classes.
-
- Python threading module contains a simple logging mechanism, but:
- - It's limitted to RLock class
- - It's enabled instance by instance
- - Choice to log or not is done at instanciation
- - It does not emit any log before trying to acquire lock
-
- This file defines a VerboseLock class implementing basic lock API and
- logging in appropriate places with extensive details.
-
- It can be globaly toggled by changing VERBOSE_LOCKING value.
- There is no overhead at all when disabled (passthrough to threading
- classes).
-"""
-
-__all__ = ['Lock', 'RLock', 'Queue', 'Empty']
-
-VERBOSE_LOCKING = False
-
-import traceback
-import sys
-import os
-
-class LockUser(object):
- def __init__(self, level=0):
- self.ident = currentThread().getName()
- # This class is instanciated from a place desiring to known what
- # called it.
- # limit=1 would return execution position in this method
- # limit=2 would return execution position in caller
- # limit=3 returns execution position in caller's caller
- # Additionnal level value (should be positive only) can be used when
- # more intermediate calls are involved
- self.stack = stack = traceback.extract_stack()[:-(2 + level)]
- path, line_number, func_name, line = stack[-1]
- # Simplify path. Only keep 3 last path elements. It is enough for
- # current Neo directory structure.
- path = os.path.join('...', *path.split(os.path.sep)[-3:])
- self.caller = (path, line_number, func_name, line)
-
- def __eq__(self, other):
- return isinstance(other, self.__class__) and self.ident == other.ident
-
- def __repr__(self):
- return '%s@%s:%s %s' % (self.ident, self.caller[0], self.caller[1],
- self.caller[3])
-
- def formatStack(self):
- return ''.join(traceback.format_list(self.stack))
-
-class VerboseLockBase(object):
- def __init__(self, reentrant=False, debug_lock=False):
- self.reentrant = reentrant
- self.debug_lock = debug_lock
- self.owner = None
- self.waiting = []
- self._note('%s@%X created by %r', self.__class__.__name__, id(self),
- LockUser(1))
-
- def _note(self, fmt, *args):
- sys.stderr.write(fmt % args + '\n')
- sys.stderr.flush()
-
- def _getOwner(self):
- if self._locked():
- owner = self.owner
- else:
- owner = None
- return owner
-
- def acquire(self, blocking=1):
- me = LockUser()
- owner = self._getOwner()
- self._note('[%r]%s.acquire(%s) Waiting for lock. Owned by:%r ' \
- 'Waiting:%r', me, self, blocking, owner, self.waiting)
- if (self.debug_lock and owner is not None) or \
- (not self.reentrant and blocking and me == owner):
- if me == owner:
- self._note('[%r]%s.acquire(%s): Deadlock detected: ' \
- ' I already own this lock:%r', me, self, blocking, owner)
- else:
- self._note('[%r]%s.acquire(%s): debug lock triggered: %r',
- me, self, blocking, owner)
- self._note('Owner traceback:\n%s', owner.formatStack())
- self._note('My traceback:\n%s', me.formatStack())
- self.waiting.append(me)
- try:
- return self.lock.acquire(blocking)
- finally:
- self.owner = me
- self.waiting.remove(me)
- self._note('[%r]%s.acquire(%s) Lock granted. Waiting: %r',
- me, self, blocking, self.waiting)
-
- def release(self):
- me = LockUser()
- self._note('[%r]%s.release() Waiting: %r', me, self, self.waiting)
- return self.lock.release()
-
- def _locked(self):
- raise NotImplementedError
-
- def __repr__(self):
- return '<%s@%X>' % (self.__class__.__name__, id(self))
-
-class VerboseRLock(VerboseLockBase):
- def __init__(self, verbose=None, debug_lock=False):
- super(VerboseRLock, self).__init__(reentrant=True,
- debug_lock=debug_lock)
- self.lock = threading_RLock()
-
- def _locked(self):
- return self.lock._RLock__block.locked()
-
- def _is_owned(self):
- return self.lock._is_owned()
-
-class VerboseLock(VerboseLockBase):
- def __init__(self, verbose=None, debug_lock=False):
- super(VerboseLock, self).__init__(debug_lock=debug_lock)
- self.lock = threading_Lock()
-
- def locked(self):
- return self.lock.locked()
- _locked = locked
-
-class VerboseQueue(Queue_Queue):
- def __init__(self, maxsize=0):
- if maxsize <= 0:
- self.put = self._verbose_put
- Queue_Queue.__init__(self, maxsize=maxsize)
-
- def _verbose_note(self, fmt, *args):
- sys.stderr.write(fmt % args + '\n')
- sys.stderr.flush()
-
- def get(self, block=True, timeout=None):
- note = self._verbose_note
- me = '[%r]%s.get(block=%r, timeout=%r)' % (LockUser(), self, block, timeout)
- note('%s waiting', me)
- try:
- result = Queue_Queue.get(self, block=block, timeout=timeout)
- except Exception, exc:
- note('%s got exeption %r', me, exc)
- raise
- note('%s got item', me)
- return result
-
- def _verbose_put(self, item, block=True, timeout=None):
- note = self._verbose_note
- me = '[%r]%s.put(..., block=%r, timeout=%r)' % (LockUser(), self, block, timeout)
- try:
- Queue_Queue.put(self, item, block=block, timeout=timeout)
- except Exception, exc:
- note('%s got exeption %r', me, exc)
- raise
- note('%s put item', me)
-
- def __repr__(self):
- return '<%s@%X>' % (self.__class__.__name__, id(self))
-
-if VERBOSE_LOCKING:
- Lock = VerboseLock
- RLock = VerboseRLock
- Queue = VerboseQueue
-else:
- Lock = threading_Lock
- RLock = threading_RLock
- Queue = Queue_Queue
Removed: trunk/neo/logger.py
==============================================================================
--- trunk/neo/logger.py [iso-8859-1] (original)
+++ trunk/neo/logger.py (removed)
@@ -1,72 +0,0 @@
-#
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-import neo
-from neo.protocol import PacketMalformedError
-from neo.util import dump
-from neo.handler import EventHandler
-from neo.profiling import profiler_decorator
-
-LOGGER_ENABLED = False
-
-class PacketLogger(object):
- """ Logger at packet level (for debugging purpose) """
-
- def __init__(self):
- _temp = EventHandler(None)
- self.packet_dispatch_table = _temp.packet_dispatch_table
- self.error_dispatch_table = _temp.error_dispatch_table
-
- def dispatch(self, conn, packet, direction):
- """This is a helper method to handle various packet types."""
- # default log message
- klass = packet.getType()
- uuid = dump(conn.getUUID())
- ip, port = conn.getAddress()
- neo.logging.debug('#0x%08x %-30s %s %s (%s:%d)', packet.getId(),
- packet.__class__.__name__, direction, uuid, ip, port)
- # look for custom packet logger
- logger = self.packet_dispatch_table.get(klass, None)
- logger = logger and getattr(self, logger.im_func.__name__, None)
- if logger is None:
- return
- # enhanced log
- try:
- args = packet.decode() or ()
- except PacketMalformedError:
- neo.logging.warning("Can't decode packet for logging")
- return
- log_message = logger(conn, *args)
- if log_message is not None:
- neo.logging.debug('#0x%08x %s', packet.getId(), log_message)
-
- def error(self, conn, code, message):
- return "%s (%s)" % (code, message)
-
- def notifyNodeInformation(self, conn, node_list):
- for node_type, address, uuid, state in node_list:
- if address is not None:
- address = '%s:%d' % address
- else:
- address = '?'
- node = (dump(uuid), node_type, address, state)
- neo.logging.debug(' ! %s | %8s | %22s | %s' % node)
-
-PACKET_LOGGER = PacketLogger()
-if not LOGGER_ENABLED:
- # disable logger
- PACKET_LOGGER.dispatch = lambda *args, **kw: None
Modified: trunk/neo/master/app.py
==============================================================================
--- trunk/neo/master/app.py [iso-8859-1] (original)
+++ trunk/neo/master/app.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -19,13 +19,13 @@ import neo
import os, sys
from time import time
-from neo import protocol
-from neo.protocol import UUID_NAMESPACES, ZERO_TID
-from neo.protocol import ClusterStates, NodeStates, NodeTypes, Packets
-from neo.node import NodeManager
-from neo.event import EventManager
-from neo.connection import ListeningConnection, ClientConnection
-from neo.exception import ElectionFailure, PrimaryFailure, OperationFailure
+from neo.lib import protocol
+from neo.lib.protocol import UUID_NAMESPACES, ZERO_TID
+from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
+from neo.lib.node import NodeManager
+from neo.lib.event import EventManager
+from neo.lib.connection import ListeningConnection, ClientConnection
+from neo.lib.exception import ElectionFailure, PrimaryFailure, OperationFailure
from neo.master.handlers import election, identification, secondary
from neo.master.handlers import storage, client, shutdown
from neo.master.handlers import administration
@@ -33,10 +33,10 @@ from neo.master.pt import PartitionTable
from neo.master.transactions import TransactionManager
from neo.master.verification import VerificationManager
from neo.master.recovery import RecoveryManager
-from neo.util import dump
-from neo.connector import getConnectorHandler
+from neo.lib.util import dump
+from neo.lib.connector import getConnectorHandler
-from neo.live_debug import register as registerLiveDebugger
+from neo.lib.live_debug import register as registerLiveDebugger
class Application(object):
"""The master node application."""
@@ -61,7 +61,7 @@ class Application(object):
for address in config.getMasters():
self.nm.createMaster(address=address)
- neo.logging.debug('IP address is %s, port is %d', *(self.server))
+ neo.lib.logging.debug('IP address is %s, port is %d', *(self.server))
# Partition table
replicas, partitions = config.getReplicas(), config.getPartitions()
@@ -70,10 +70,10 @@ class Application(object):
if partitions <= 0:
raise RuntimeError, 'partitions must be more than zero'
self.pt = PartitionTable(partitions, replicas)
- neo.logging.info('Configuration:')
- neo.logging.info('Partitions: %d', partitions)
- neo.logging.info('Replicas : %d', replicas)
- neo.logging.info('Name : %s', self.name)
+ neo.lib.logging.info('Configuration:')
+ neo.lib.logging.info('Partitions: %d', partitions)
+ neo.lib.logging.info('Replicas : %d', replicas)
+ neo.lib.logging.info('Name : %s', self.name)
self.listening_conn = None
self.primary = None
@@ -86,7 +86,7 @@ class Application(object):
if uuid is None or uuid == '':
uuid = self.getNewUUID(NodeTypes.MASTER)
self.uuid = uuid
- neo.logging.info('UUID : %s', dump(uuid))
+ neo.lib.logging.info('UUID : %s', dump(uuid))
# election related data
self.unconnected_master_node_set = set()
@@ -107,7 +107,7 @@ class Application(object):
try:
self._run()
except:
- neo.logging.info('\nPre-mortem informations:')
+ neo.lib.logging.info('\nPre-mortem informations:')
self.log()
raise
@@ -145,7 +145,7 @@ class Application(object):
others while attempting to connect to other master nodes at the
same time. Note that storage nodes and client nodes may connect
to self as well as master nodes."""
- neo.logging.info('begin the election of a primary master')
+ neo.lib.logging.info('begin the election of a primary master')
self.unconnected_master_node_set.clear()
self.negotiating_master_node_set.clear()
@@ -199,7 +199,7 @@ class Application(object):
for node in self.nm.getMasterList():
if not node.isRunning() and node.getLastStateChange() + \
expiration < current_time:
- neo.logging.info('%s is down' % (node, ))
+ neo.lib.logging.info('%s is down' % (node, ))
node.setDown()
self.unconnected_master_node_set.discard(
node.getAddress())
@@ -222,7 +222,7 @@ class Application(object):
Broadcast the announce that I'm the primary
"""
# I am the primary.
- neo.logging.debug('I am the primary, sending an announcement')
+ neo.lib.logging.debug('I am the primary, sending an announcement')
for conn in self.em.getClientList():
conn.notify(Packets.AnnouncePrimary())
conn.abort()
@@ -239,7 +239,7 @@ class Application(object):
"""
Ask other masters to reelect a primary after an election failure.
"""
- neo.logging.error('election failed: %s', (m, ))
+ neo.lib.logging.error('election failed: %s', (m, ))
# Ask all connected nodes to reelect a single primary master.
for conn in self.em.getClientList():
@@ -288,7 +288,7 @@ class Application(object):
def broadcastPartitionChanges(self, cell_list, selector=None):
"""Broadcast a Notify Partition Changes packet."""
- neo.logging.debug('broadcastPartitionChanges')
+ neo.lib.logging.debug('broadcastPartitionChanges')
if not cell_list:
return
if not selector:
@@ -308,7 +308,8 @@ class Application(object):
def broadcastLastOID(self):
oid = self.tm.getLastOID()
- neo.logging.debug('Broadcast last OID to storages : %s' % dump(oid))
+ neo.lib.logging.debug(
+ 'Broadcast last OID to storages : %s' % dump(oid))
packet = Packets.NotifyLastOID(oid)
for node in self.nm.getStorageList(only_identified=True):
node.notify(packet)
@@ -319,7 +320,7 @@ class Application(object):
and stop the service only if a catastrophy happens or the user commits
a shutdown.
"""
- neo.logging.info('provide service')
+ neo.lib.logging.info('provide service')
em = self.em
self.tm.reset()
@@ -332,7 +333,7 @@ class Application(object):
except OperationFailure:
# If not operational, send Stop Operation packets to storage
# nodes and client nodes. Abort connections to client nodes.
- neo.logging.critical('No longer operational')
+ neo.lib.logging.critical('No longer operational')
for node in self.nm.getIdentifiedList():
if node.isStorage() or node.isClient():
node.notify(Packets.StopOperation())
@@ -345,7 +346,8 @@ class Application(object):
return
def playPrimaryRole(self):
- neo.logging.info('play the primary role with %r', self.listening_conn)
+ neo.lib.logging.info(
+ 'play the primary role with %r', self.listening_conn)
# i'm the primary, send the announcement
self._announcePrimary()
@@ -382,7 +384,7 @@ class Application(object):
"""
I play a secondary role, thus only wait for a primary master to fail.
"""
- neo.logging.info('play the secondary role with %r',
+ neo.lib.logging.info('play the secondary role with %r',
self.listening_conn)
# Wait for an announcement. If this is too long, probably
@@ -496,7 +498,7 @@ class Application(object):
self.em.poll(1)
if self.cluster_state != ClusterStates.RUNNING:
- neo.logging.info("asking all nodes to shutdown")
+ neo.lib.logging.info("asking all nodes to shutdown")
# This code sends packets but never polls, so they never reach
# network.
for node in self.nm.getIdentifiedList():
@@ -533,7 +535,7 @@ class Application(object):
# always accept admin nodes
node_ctor = self.nm.createAdmin
handler = administration.AdministrationHandler(self)
- neo.logging.info('Accept an admin %s' % (dump(uuid), ))
+ neo.lib.logging.info('Accept an admin %s' % (dump(uuid), ))
elif node_type == NodeTypes.MASTER:
if node is None:
# unknown master, rejected
@@ -541,15 +543,15 @@ class Application(object):
# always put other master in waiting state
node_ctor = self.nm.createMaster
handler = secondary.SecondaryMasterHandler(self)
- neo.logging.info('Accept a master %s' % (dump(uuid), ))
+ neo.lib.logging.info('Accept a master %s' % (dump(uuid), ))
elif node_type == NodeTypes.CLIENT:
# refuse any client before running
if self.cluster_state != ClusterStates.RUNNING:
- neo.logging.info('Reject a connection from a client')
+ neo.lib.logging.info('Reject a connection from a client')
raise protocol.NotReadyError
node_ctor = self.nm.createClient
handler = client.ClientServiceHandler(self)
- neo.logging.info('Accept a client %s' % (dump(uuid), ))
+ neo.lib.logging.info('Accept a client %s' % (dump(uuid), ))
elif node_type == NodeTypes.STORAGE:
node_ctor = self.nm.createStorage
if self._current_manager is not None:
@@ -557,7 +559,8 @@ class Application(object):
(uuid, state, handler) = identify(uuid, node)
else:
(uuid, state, handler) = self.identifyStorageNode(uuid, node)
- neo.logging.info('Accept a storage %s (%s)' % (dump(uuid), state))
+ neo.lib.logging.info('Accept a storage %s (%s)' %
+ (dump(uuid), state))
return (uuid, node, state, handler, node_ctor)
def onTransactionCommitted(self, txn):
Modified: trunk/neo/master/handlers/__init__.py
==============================================================================
--- trunk/neo/master/handlers/__init__.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/__init__.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -17,15 +17,16 @@
import neo
-from neo.handler import EventHandler
-from neo.protocol import NodeTypes, NodeStates, Packets
-from neo.util import dump
+from neo.lib.handler import EventHandler
+from neo.lib.protocol import NodeTypes, NodeStates, Packets
+from neo.lib.util import dump
class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
def protocolError(self, conn, message):
- neo.logging.error('Protocol error %s %s', message, conn.getAddress())
+ neo.lib.logging.error(
+ 'Protocol error %s %s', message, conn.getAddress())
def askPrimary(self, conn):
app = self.app
@@ -94,7 +95,7 @@ class BaseServiceHandler(MasterHandler):
if new_state != NodeStates.BROKEN and was_pending:
# was in pending state, so drop it from the node manager to forget
# it and do not set in running state when it comes back
- neo.logging.info('drop a pending node from the node manager')
+ neo.lib.logging.info('drop a pending node from the node manager')
self.app.nm.remove(node)
self.app.broadcastNodesInformation([node])
# clean node related data in specialized handlers
Modified: trunk/neo/master/handlers/administration.py
==============================================================================
--- trunk/neo/master/handlers/administration.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/administration.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -18,9 +18,9 @@
import neo
from neo.master.handlers import MasterHandler
-from neo.protocol import ClusterStates, NodeStates, Packets, ProtocolError
-from neo.protocol import Errors
-from neo.util import dump
+from neo.lib.protocol import ClusterStates, NodeStates, Packets, ProtocolError
+from neo.lib.protocol import Errors
+from neo.lib.util import dump
CLUSTER_STATE_WORKFLOW = {
# destination: sources
@@ -63,7 +63,7 @@ class AdministrationHandler(MasterHandle
self.app.shutdown()
def setNodeState(self, conn, uuid, state, modify_partition_table):
- neo.logging.info("set node state for %s-%s : %s" %
+ neo.lib.logging.info("set node state for %s-%s : %s" %
(dump(uuid), state, modify_partition_table))
app = self.app
node = app.nm.getByUUID(uuid)
@@ -119,7 +119,7 @@ class AdministrationHandler(MasterHandle
def addPendingNodes(self, conn, uuid_list):
uuids = ', '.join([dump(uuid) for uuid in uuid_list])
- neo.logging.debug('Add nodes %s' % uuids)
+ neo.lib.logging.debug('Add nodes %s' % uuids)
app = self.app
nm = app.nm
em = app.em
@@ -136,11 +136,11 @@ class AdministrationHandler(MasterHandle
uuid_set = uuid_set.intersection(set(uuid_list))
# nothing to do
if not uuid_set:
- neo.logging.warning('No nodes added')
+ neo.lib.logging.warning('No nodes added')
conn.answer(Errors.Ack('No nodes added'))
return
uuids = ', '.join([dump(uuid) for uuid in uuid_set])
- neo.logging.info('Adding nodes %s' % uuids)
+ neo.lib.logging.info('Adding nodes %s' % uuids)
# switch nodes to running state
node_list = [nm.getByUUID(uuid) for uuid in uuid_set]
for node in node_list:
Modified: trunk/neo/master/handlers/client.py
==============================================================================
--- trunk/neo/master/handlers/client.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/client.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -15,11 +15,11 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-import neo
+import neo.lib
-from neo.protocol import NodeStates, Packets, ProtocolError
+from neo.lib.protocol import NodeStates, Packets, ProtocolError
from neo.master.handlers import MasterHandler
-from neo.util import dump
+from neo.lib.util import dump
from neo.master.transactions import DelayedError
class ClientServiceHandler(MasterHandler):
Modified: trunk/neo/master/handlers/election.py
==============================================================================
--- trunk/neo/master/handlers/election.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/election.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -15,14 +15,15 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-import neo
+import neo.lib
-from neo.protocol import NodeTypes, Packets
-from neo.protocol import NotReadyError, ProtocolError, UnexpectedPacketError
-from neo.protocol import BrokenNodeDisallowedError
+from neo.lib.protocol import NodeTypes, Packets
+from neo.lib.protocol import NotReadyError, ProtocolError,
+ UnexpectedPacketError
+from neo.lib.protocol import BrokenNodeDisallowedError
from neo.master.handlers import MasterHandler
-from neo.exception import ElectionFailure
-from neo.util import dump
+from neo.lib.exception import ElectionFailure
+from neo.lib.util import dump
class ClientElectionHandler(MasterHandler):
@@ -90,7 +91,7 @@ class ClientElectionHandler(MasterHandle
node = app.nm.getByAddress(conn.getAddress())
if node_type != NodeTypes.MASTER:
# The peer is not a master node!
- neo.logging.error('%r is not a master node', conn)
+ neo.lib.logging.error('%r is not a master node', conn)
app.nm.remove(node)
app.negotiating_master_node_set.discard(node.getAddress())
conn.close()
@@ -99,7 +100,8 @@ class ClientElectionHandler(MasterHandle
if your_uuid != app.uuid:
# uuid conflict happened, accept the new one and restart election
app.uuid = your_uuid
- neo.logging.info('UUID conflict, new UUID: %s', dump(your_uuid))
+ neo.lib.logging.info('UUID conflict, new UUID: %s',
+ dump(your_uuid))
raise ElectionFailure, 'new uuid supplied'
conn.setUUID(uuid)
@@ -138,7 +140,8 @@ class ClientElectionHandler(MasterHandle
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
- neo.logging.warning('received an unknown primary node UUID')
+ neo.lib.logging.warning(
+ 'received an unknown primary node UUID')
else:
# Whatever the situation is, I trust this master.
app.primary = False
@@ -196,11 +199,11 @@ class ServerElectionHandler(MasterHandle
self.checkClusterName(name)
app = self.app
if node_type != NodeTypes.MASTER:
- neo.logging.info('reject a connection from a non-master')
+ neo.lib.logging.info('reject a connection from a non-master')
raise NotReadyError
node = app.nm.getByAddress(address)
if node is None:
- neo.logging.error('unknown master node: %s' % (address, ))
+ neo.lib.logging.error('unknown master node: %s' % (address, ))
raise ProtocolError('unknown master node')
# If this node is broken, reject it.
if node.getUUID() == uuid:
@@ -236,5 +239,5 @@ class ServerElectionHandler(MasterHandle
app.primary_master_node = node
app.unconnected_master_node_set.clear()
app.negotiating_master_node_set.clear()
- neo.logging.info('%s is the primary', node)
+ neo.lib.logging.info('%s is the primary', node)
Modified: trunk/neo/master/handlers/identification.py
==============================================================================
--- trunk/neo/master/handlers/identification.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/identification.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -17,15 +17,16 @@
import neo
-from neo.protocol import NodeTypes, Packets
-from neo.protocol import BrokenNodeDisallowedError, ProtocolError
+from neo.lib.protocol import NodeTypes, Packets
+from neo.lib.protocol import BrokenNodeDisallowedError, ProtocolError
from neo.master.handlers import MasterHandler
class IdentificationHandler(MasterHandler):
"""This class deals with messages from the admin node only"""
def nodeLost(self, conn, node):
- neo.logging.warning('lost a node in IdentificationHandler : %s' % node)
+ neo.lib.logging.warning('
+ lost a node in IdentificationHandler : %s' % node)
def requestIdentification(self, conn, node_type, uuid, address, name):
Modified: trunk/neo/master/handlers/secondary.py
==============================================================================
--- trunk/neo/master/handlers/secondary.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/secondary.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -16,8 +16,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.master.handlers import MasterHandler
-from neo.exception import ElectionFailure, PrimaryFailure
-from neo.protocol import NodeTypes, Packets
+from neo.lib.exception import ElectionFailure, PrimaryFailure
+from neo.lib.protocol import NodeTypes, Packets
class SecondaryMasterHandler(MasterHandler):
""" Handler used by primary to handle secondary masters"""
Modified: trunk/neo/master/handlers/shutdown.py
==============================================================================
--- trunk/neo/master/handlers/shutdown.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/shutdown.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -15,8 +15,8 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-import neo
-from neo import protocol
+import neo.lib
+from neo.lib import protocol
from neo.master.handlers import BaseServiceHandler
class ShutdownHandler(BaseServiceHandler):
@@ -24,15 +24,15 @@ class ShutdownHandler(BaseServiceHandler
def requestIdentification(self, conn, node_type,
uuid, address, name):
- neo.logging.error('reject any new connection')
+ neo.lib.logging.error('reject any new connection')
raise protocol.ProtocolError('cluster is shutting down')
def askPrimary(self, conn):
- neo.logging.error('reject any new demand for primary master')
+ neo.lib.logging.error('reject any new demand for primary master')
raise protocol.ProtocolError('cluster is shutting down')
def askBeginTransaction(self, conn, tid):
- neo.logging.error('reject any new demand for new tid')
+ neo.lib.logging.error('reject any new demand for new tid')
raise protocol.ProtocolError('cluster is shutting down')
Modified: trunk/neo/master/handlers/storage.py
==============================================================================
--- trunk/neo/master/handlers/storage.py [iso-8859-1] (original)
+++ trunk/neo/master/handlers/storage.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -15,15 +15,15 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-import neo
+import neo.lib
-from neo.protocol import ProtocolError
-from neo.protocol import Packets
+from neo.lib.protocol import ProtocolError
+from neo.lib.protocol import Packets
from neo.master.handlers import BaseServiceHandler
-from neo.exception import OperationFailure
-from neo.util import dump
-from neo.connector import ConnectorConnectionClosedException
-from neo.pt import PartitionTableException
+from neo.lib.exception import OperationFailure
+from neo.lib.util import dump
+from neo.lib.connector import ConnectorConnectionClosedException
+from neo.lib.pt import PartitionTableException
class StorageServiceHandler(BaseServiceHandler):
@@ -40,7 +40,7 @@ class StorageServiceHandler(BaseServiceH
conn.notify(Packets.StartOperation())
def nodeLost(self, conn, node):
- neo.logging.info('storage node lost')
+ neo.lib.logging.info('storage node lost')
assert not node.isRunning(), node.getState()
if not self.app.pt.operational():
@@ -71,7 +71,7 @@ class StorageServiceHandler(BaseServiceH
def notifyReplicationDone(self, conn, offset):
node = self.app.nm.getByUUID(conn.getUUID())
- neo.logging.debug("%s is up for offset %s" % (node, offset))
+ neo.lib.logging.debug("%s is up for offset %s" % (node, offset))
try:
cell_list = self.app.pt.setUpToDate(node, offset)
except PartitionTableException, e:
Modified: trunk/neo/master/pt.py
==============================================================================
--- trunk/neo/master/pt.py [iso-8859-1] (original)
+++ trunk/neo/master/pt.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -15,12 +15,13 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-import neo.pt
+import neo.lib.pt
from struct import pack, unpack
-from neo.protocol import CellStates
-from neo.pt import PartitionTableException
+from neo.lib.protocol import CellStates
+from neo.lib.pt import PartitionTableException
+from neo.lib.pt import PartitionTable
-class PartitionTable(neo.pt.PartitionTable):
+class PartitionTable(PartitionTable):
"""This class manages a partition table for the primary master node"""
def setID(self, id):
@@ -53,7 +54,7 @@ class PartitionTable(neo.pt.PartitionTab
row = []
for _ in xrange(repeats):
node = node_list[index]
- row.append(neo.pt.Cell(node))
+ row.append(neo.lib.pt.Cell(node))
self.count_dict[node] = self.count_dict.get(node, 0) + 1
index += 1
if index == len(node_list):
@@ -87,7 +88,8 @@ class PartitionTable(neo.pt.PartitionTab
node_list = [c.getNode() for c in row]
n = self.findLeastUsedNode(node_list)
if n is not None:
- row.append(neo.pt.Cell(n, CellStates.OUT_OF_DATE))
+ row.append(neo.lib.pt.Cell(n,
+ CellStates.OUT_OF_DATE))
self.count_dict[n] += 1
cell_list.append((offset, n.getUUID(),
CellStates.OUT_OF_DATE))
@@ -180,7 +182,7 @@ class PartitionTable(neo.pt.PartitionTab
continue
if num_cells <= self.nr:
- row.append(neo.pt.Cell(node, CellStates.OUT_OF_DATE))
+ row.append(neo.lib.pt.Cell(node, CellStates.OUT_OF_DATE))
cell_list.append((offset, node.getUUID(),
CellStates.OUT_OF_DATE))
node_count += 1
@@ -200,7 +202,7 @@ class PartitionTable(neo.pt.PartitionTab
CellStates.FEEDING))
# Don't count a feeding cell.
self.count_dict[max_cell.getNode()] -= 1
- row.append(neo.pt.Cell(node, CellStates.OUT_OF_DATE))
+ row.append(neo.lib.pt.Cell(node, CellStates.OUT_OF_DATE))
cell_list.append((offset, node.getUUID(),
CellStates.OUT_OF_DATE))
node_count += 1
@@ -281,7 +283,7 @@ class PartitionTable(neo.pt.PartitionTab
node = self.findLeastUsedNode([cell.getNode() for cell in row])
if node is None:
break
- row.append(neo.pt.Cell(node, CellStates.OUT_OF_DATE))
+ row.append(neo.lib.pt.Cell(node, CellStates.OUT_OF_DATE))
changed_cell_list.append((offset, node.getUUID(),
CellStates.OUT_OF_DATE))
self.count_dict[node] += 1
Modified: trunk/neo/master/recovery.py
==============================================================================
--- trunk/neo/master/recovery.py [iso-8859-1] (original)
+++ trunk/neo/master/recovery.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -18,9 +18,9 @@
from struct import pack
import neo
-from neo.util import dump
-from neo.protocol import Packets, ProtocolError, ClusterStates, NodeStates
-from neo.protocol import NotReadyError, ZERO_OID, ZERO_TID
+from neo.lib.util import dump
+from neo.lib.protocol import Packets, ProtocolError, ClusterStates, NodeStates
+from neo.lib.protocol import NotReadyError, ZERO_OID, ZERO_TID
from neo.master.handlers import MasterHandler
REQUIRED_NODE_NUMBER = 1
@@ -43,7 +43,7 @@ class RecoveryManager(MasterHandler):
Returns the handler for storage nodes
"""
if uuid is None and not self.app._startup_allowed:
- neo.logging.info('reject empty storage node')
+ neo.lib.logging.info('reject empty storage node')
raise NotReadyError
return (uuid, NodeStates.RUNNING, self)
@@ -54,7 +54,7 @@ class RecoveryManager(MasterHandler):
back the latest partition table or make a new table from scratch,
if this is the first time.
"""
- neo.logging.info('begin the recovery of the status')
+ neo.lib.logging.info('begin the recovery of the status')
self.app.changeClusterState(ClusterStates.RECOVERING)
em = self.app.em
@@ -66,7 +66,7 @@ class RecoveryManager(MasterHandler):
while not self.app._startup_allowed:
em.poll(1)
- neo.logging.info('startup allowed')
+ neo.lib.logging.info('startup allowed')
# build a new partition table
if self.app.pt.getID() is None:
@@ -81,13 +81,14 @@ class RecoveryManager(MasterHandler):
self.app.broadcastNodesInformation(refused_node_set)
self.app.setLastTransaction(self.app.tm.getLastTID())
- neo.logging.debug('cluster starts with loid=%s and this partition ' \
- 'table :', dump(self.app.tm.getLastOID()))
+ neo.lib.logging.debug(
+ 'cluster starts with loid=%s and this partition ' \
+ 'table :', dump(self.app.tm.getLastOID()))
self.app.pt.log()
def buildFromScratch(self):
nm, em, pt = self.app.nm, self.app.em, self.app.pt
- neo.logging.debug('creating a new partition table, wait for a ' \
+ neo.lib.logging.debug('creating a new partition table, wait for a ' \
'storage node')
# wait for some empty storage nodes, their are accepted
while len(nm.getStorageList()) < REQUIRED_NODE_NUMBER:
@@ -131,7 +132,7 @@ class RecoveryManager(MasterHandler):
def answerPartitionTable(self, conn, ptid, row_list):
if ptid != self.target_ptid:
# If this is not from a target node, ignore it.
- neo.logging.warn('Got %s while waiting %s', dump(ptid),
+ neo.lib.logging.warn('Got %s while waiting %s', dump(ptid),
dump(self.target_ptid))
return
try:
Modified: trunk/neo/master/transactions.py
==============================================================================
--- trunk/neo/master/transactions.py [iso-8859-1] (original)
+++ trunk/neo/master/transactions.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -17,10 +17,10 @@
from time import time, gmtime
from struct import pack, unpack
-from neo.protocol import ZERO_TID
+from neo.lib.protocol import ZERO_TID
from datetime import timedelta, datetime
-from neo.util import dump, u64, p64
-import neo
+from neo.lib.util import dump, u64, p64
+import neo.lib
TID_LOW_OVERFLOW = 2**32
TID_LOW_MAX = TID_LOW_OVERFLOW - 1
@@ -347,7 +347,8 @@ class TransactionManager(object):
else:
tid = self._nextTID(ttid, divisor)
self._queue.append((node.getUUID(), ttid))
- neo.logging.debug('Finish TXN %s for %s (was %s)', dump(tid), node, dump(ttid))
+ neo.lib.logging.debug('Finish TXN %s for %s (was %s)',
+ dump(tid), node, dump(ttid))
txn = Transaction(node, ttid, tid, oid_list, uuid_list, msg_id)
self._ttid_dict[ttid] = txn
self._node_dict.setdefault(node, {})[ttid] = txn
@@ -418,7 +419,7 @@ class TransactionManager(object):
"""
Abort pending transactions initiated by a node
"""
- neo.logging.debug('Abort TXN for %s', node)
+ neo.lib.logging.debug('Abort TXN for %s', node)
uuid = node.getUUID()
# XXX: this loop is usefull only during an import
for nuuid, ntid in list(self._queue):
@@ -433,7 +434,7 @@ class TransactionManager(object):
del self._node_dict[node]
def log(self):
- neo.logging.info('Transactions:')
+ neo.lib.logging.info('Transactions:')
for txn in self._ttid_dict.itervalues():
- neo.logging.info(' %r', txn)
+ neo.lib.logging.info(' %r', txn)
Modified: trunk/neo/master/verification.py
==============================================================================
--- trunk/neo/master/verification.py [iso-8859-1] (original)
+++ trunk/neo/master/verification.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -16,8 +16,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import neo
-from neo.util import dump
-from neo.protocol import ClusterStates, Packets, NodeStates
+from neo.lib.util import dump
+from neo.lib.protocol import ClusterStates, Packets, NodeStates
from neo.master.handlers import BaseServiceHandler
@@ -117,11 +117,11 @@ class VerificationManager(BaseServiceHan
em, nm = self.app.em, self.app.nm
# wait for any missing node
- neo.logging.debug('waiting for the cluster to be operational')
+ neo.lib.logging.debug('waiting for the cluster to be operational')
while not self.app.pt.operational():
em.poll(1)
- neo.logging.info('start to verify data')
+ neo.lib.logging.info('start to verify data')
# Gather all unfinished transactions.
self._askStorageNodesAndWait(Packets.AskUnfinishedTransactions(),
@@ -197,7 +197,7 @@ class VerificationManager(BaseServiceHan
def answerUnfinishedTransactions(self, conn, tid_list):
uuid = conn.getUUID()
- neo.logging.info('got unfinished transactions %s from %r',
+ neo.lib.logging.info('got unfinished transactions %s from %r',
[dump(tid) for tid in tid_list], conn)
if not self._gotAnswerFrom(uuid):
return
@@ -222,19 +222,19 @@ class VerificationManager(BaseServiceHan
def tidNotFound(self, conn, message):
uuid = conn.getUUID()
- neo.logging.info('TID not found: %s', message)
+ neo.lib.logging.info('TID not found: %s', message)
if not self._gotAnswerFrom(uuid):
return
self._oid_set = None
def answerObjectPresent(self, conn, oid, tid):
uuid = conn.getUUID()
- neo.logging.info('object %s:%s found', dump(oid), dump(tid))
+ neo.lib.logging.info('object %s:%s found', dump(oid), dump(tid))
self._gotAnswerFrom(uuid)
def oidNotFound(self, conn, message):
uuid = conn.getUUID()
- neo.logging.info('OID not found: %s', message)
+ neo.lib.logging.info('OID not found: %s', message)
app = self.app
if not self._gotAnswerFrom(uuid):
return
Modified: trunk/neo/neoctl/app.py
==============================================================================
--- trunk/neo/neoctl/app.py [iso-8859-1] (original)
+++ trunk/neo/neoctl/app.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -16,8 +16,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.neoctl.neoctl import NeoCTL, NotReadyException
-from neo.util import bin, dump
-from neo.protocol import ClusterStates, NodeStates, NodeTypes
+from neo.lib.util import bin, dump
+from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes
action_dict = {
'print': {
Modified: trunk/neo/neoctl/handler.py
==============================================================================
--- trunk/neo/neoctl/handler.py [iso-8859-1] (original)
+++ trunk/neo/neoctl/handler.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -15,8 +15,8 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-from neo.handler import EventHandler
-from neo.protocol import ErrorCodes, Packets
+from neo.lib.handler import EventHandler
+from neo.lib.protocol import ErrorCodes, Packets
class CommandEventHandler(EventHandler):
""" Base handler for command """
Modified: trunk/neo/neoctl/neoctl.py
==============================================================================
--- trunk/neo/neoctl/neoctl.py [iso-8859-1] (original)
+++ trunk/neo/neoctl/neoctl.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -15,11 +15,11 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-from neo.connector import getConnectorHandler
-from neo.connection import ClientConnection
-from neo.event import EventManager
+from neo.lib.connector import getConnectorHandler
+from neo.lib.connection import ClientConnection
+from neo.lib.event import EventManager
from neo.neoctl.handler import CommandEventHandler
-from neo.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
+from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
class NotReadyException(Exception):
pass
Removed: trunk/neo/node.py
==============================================================================
--- trunk/neo/node.py [iso-8859-1] (original)
+++ trunk/neo/node.py (removed)
@@ -1,486 +0,0 @@
-#
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-from time import time
-
-import neo
-from neo.util import dump
-from neo.protocol import NodeTypes, NodeStates
-
-from neo import attributeTracker
-
-class Node(object):
- """This class represents a node."""
-
- def __init__(self, manager, address=None, uuid=None,
- state=NodeStates.UNKNOWN):
- self._state = state
- self._address = address
- self._uuid = uuid
- self._manager = manager
- self._last_state_change = time()
- self._connection = None
- manager.add(self)
-
- def notify(self, packet):
- assert self.isConnected(), 'Not connected'
- self._connection.notify(packet)
-
- def ask(self, packet, *args, **kw):
- assert self.isConnected(), 'Not connected'
- self._connection.ask(packet, *args, **kw)
-
- def answer(self, packet, msg_id=None):
- assert self.isConnected(), 'Not connected'
- self._connection.answer(packet, msg_id)
-
- def getLastStateChange(self):
- return self._last_state_change
-
- def getState(self):
- return self._state
-
- def setState(self, new_state):
- if self._state == new_state:
- return
- old_state = self._state
- self._state = new_state
- self._last_state_change = time()
- self._manager._updateState(self, old_state)
-
- def setAddress(self, address):
- if self._address == address:
- return
- old_address = self._address
- self._address = address
- self._manager._updateAddress(self, old_address)
-
- def getAddress(self):
- return self._address
-
- def setUUID(self, uuid):
- if self._uuid == uuid:
- return
- old_uuid = self._uuid
- self._uuid = uuid
- self._manager._updateUUID(self, old_uuid)
- self._manager._updateIdentified(self)
-
- def getUUID(self):
- return self._uuid
-
- def onConnectionClosed(self):
- """
- Callback from node's connection when closed
- """
- assert self._connection is not None
- self._connection = None
- self._manager._updateIdentified(self)
-
- def setConnection(self, connection):
- """
- Define the connection that is currently available to this node.
- """
- assert connection is not None
- assert self._connection is None
- self._connection = connection
- connection.setOnClose(self.onConnectionClosed)
- self._manager._updateIdentified(self)
-
- def getConnection(self):
- """
- Returns the connection to the node if available
- """
- assert self._connection is not None
- return self._connection
-
- def isConnected(self):
- """
- Returns True is a connection is established with the node
- """
- return self._connection is not None
-
- def isIdentified(self):
- """
- Returns True is the node is connected and identified
- """
- return self._connection is not None and self._uuid is not None
-
- def __repr__(self):
- return '<%s(uuid=%s, address=%s, state=%s) at %x>' % (
- self.__class__.__name__,
- dump(self._uuid),
- self._address,
- self._state,
- id(self),
- )
-
- def isMaster(self):
- return False
-
- def isStorage(self):
- return False
-
- def isClient(self):
- return False
-
- def isAdmin(self):
- return False
-
- def isRunning(self):
- return self._state == NodeStates.RUNNING
-
- def isUnknown(self):
- return self._state == NodeStates.UNKNOWN
-
- def isTemporarilyDown(self):
- return self._state == NodeStates.TEMPORARILY_DOWN
-
- def isDown(self):
- return self._state == NodeStates.DOWN
-
- def isBroken(self):
- return self._state == NodeStates.BROKEN
-
- def isHidden(self):
- return self._state == NodeStates.HIDDEN
-
- def isPending(self):
- return self._state == NodeStates.PENDING
-
- def setRunning(self):
- self.setState(NodeStates.RUNNING)
-
- def setUnknown(self):
- self.setState(NodeStates.UNKNOWN)
-
- def setTemporarilyDown(self):
- self.setState(NodeStates.TEMPORARILY_DOWN)
-
- def setDown(self):
- self.setState(NodeStates.DOWN)
-
- def setBroken(self):
- self.setState(NodeStates.BROKEN)
-
- def setHidden(self):
- self.setState(NodeStates.HIDDEN)
-
- def setPending(self):
- self.setState(NodeStates.PENDING)
-
- def asTuple(self):
- """ Returned tuple is intented to be used in procotol encoders """
- return (self.getType(), self._address, self._uuid, self._state)
-
- def __gt__(self, node):
- # sort per UUID if defined
- if self._uuid is not None:
- return self._uuid > node._uuid
- return self._address > node._address
-
- def getType(self):
- try:
- return NODE_CLASS_MAPPING[self.__class__]
- except KeyError:
- raise NotImplementedError
-
- def whoSetState(self):
- """
- Debugging method: call this method to know who set the current
- state value.
- """
- return attributeTracker.whoSet(self, '_state')
-
-attributeTracker.track(Node)
-
-class MasterNode(Node):
- """This class represents a master node."""
-
- def isMaster(self):
- return True
-
-class StorageNode(Node):
- """This class represents a storage node."""
-
- def isStorage(self):
- return True
-
-class ClientNode(Node):
- """This class represents a client node."""
-
- def isClient(self):
- return True
-
-class AdminNode(Node):
- """This class represents an admin node."""
-
- def isAdmin(self):
- return True
-
-
-NODE_TYPE_MAPPING = {
- NodeTypes.MASTER: MasterNode,
- NodeTypes.STORAGE: StorageNode,
- NodeTypes.CLIENT: ClientNode,
- NodeTypes.ADMIN: AdminNode,
-}
-NODE_CLASS_MAPPING = {
- StorageNode: NodeTypes.STORAGE,
- MasterNode: NodeTypes.MASTER,
- ClientNode: NodeTypes.CLIENT,
- AdminNode: NodeTypes.ADMIN,
-}
-
-class NodeManager(object):
- """This class manages node status."""
-
- # TODO: rework getXXXList() methods, filter first by node type
- # - getStorageList(identified=True, connected=True, )
- # - getList(...)
-
- def __init__(self):
- self._node_set = set()
- self._address_dict = {}
- self._uuid_dict = {}
- self._type_dict = {}
- self._state_dict = {}
- self._identified_dict = {}
-
- def add(self, node):
- if node in self._node_set:
- neo.logging.warning('adding a known node %r, ignoring', node)
- return
- self._node_set.add(node)
- self._updateAddress(node, None)
- self._updateUUID(node, None)
- self.__updateSet(self._type_dict, None, node.__class__, node)
- self.__updateSet(self._state_dict, None, node.getState(), node)
- self._updateIdentified(node)
-
- def remove(self, node):
- if node not in self._node_set:
- neo.logging.warning('removing unknown node %r, ignoring', node)
- return
- self._node_set.remove(node)
- self.__drop(self._address_dict, node.getAddress())
- self.__drop(self._uuid_dict, node.getUUID())
- self.__dropSet(self._state_dict, node.getState(), node)
- self.__dropSet(self._type_dict, node.__class__, node)
- uuid = node.getUUID()
- if uuid in self._identified_dict:
- del self._identified_dict[uuid]
-
- def __drop(self, index_dict, key):
- try:
- del index_dict[key]
- except KeyError:
- # a node may have not be indexed by uuid or address, eg.:
- # - a master known by address but without UUID
- # - a client or admin node that don't have listening address
- pass
-
- def __update(self, index_dict, old_key, new_key, node):
- """ Update an index from old to new key """
- if old_key is not None:
- assert index_dict[old_key] is node, '%r is stored as %s, ' \
- 'moving %r to %s' % (index_dict[old_key], old_key, node,
- new_key)
- del index_dict[old_key]
- if new_key is not None:
- index_dict[new_key] = node
-
- def _updateIdentified(self, node):
- uuid = node.getUUID()
- identified = node.isIdentified()
- if not identified and uuid in self._identified_dict:
- del self._identified_dict[uuid]
- elif identified:
- self._identified_dict[uuid] = node
-
- def _updateAddress(self, node, old_address):
- self.__update(self._address_dict, old_address, node.getAddress(), node)
-
- def _updateUUID(self, node, old_uuid):
- self.__update(self._uuid_dict, old_uuid, node.getUUID(), node)
-
- def __dropSet(self, set_dict, key, node):
- if key in set_dict and node in set_dict[key]:
- set_dict[key].remove(node)
-
- def __updateSet(self, set_dict, old_key, new_key, node):
- """ Update a set index from old to new key """
- if old_key in set_dict:
- set_dict[old_key].remove(node)
- if new_key is not None:
- set_dict.setdefault(new_key, set()).add(node)
-
- def _updateState(self, node, old_state):
- self.__updateSet(self._state_dict, old_state, node.getState(), node)
-
- def getList(self, node_filter=None):
- if filter is None:
- return list(self._node_set)
- return filter(node_filter, self._node_set)
-
- def getIdentifiedList(self, pool_set=None):
- """
- Returns a generator to iterate over identified nodes
- pool_set is an iterable of UUIDs allowed
- """
- if pool_set is not None:
- identified_nodes = self._identified_dict.items()
- return [v for k, v in identified_nodes if k in pool_set]
- return list(self._identified_dict.values())
-
- def getConnectedList(self):
- """
- Returns a generator to iterate over connected nodes
- """
- # TODO: use an index
- return [x for x in self._node_set if x.isConnected()]
-
- def __getList(self, index_dict, key):
- return index_dict.setdefault(key, set())
-
- def getByStateList(self, state):
- """ Get a node list filtered per the node state """
- return list(self.__getList(self._state_dict, state))
-
- def __getTypeList(self, type_klass, only_identified=False):
- node_set = self.__getList(self._type_dict, type_klass)
- if only_identified:
- return [x for x in node_set if x.getUUID() in self._identified_dict]
- return list(node_set)
-
- def getMasterList(self, only_identified=False):
- """ Return a list with master nodes """
- return self.__getTypeList(MasterNode, only_identified)
-
- def getStorageList(self, only_identified=False):
- """ Return a list with storage nodes """
- return self.__getTypeList(StorageNode, only_identified)
-
- def getClientList(self, only_identified=False):
- """ Return a list with client nodes """
- return self.__getTypeList(ClientNode, only_identified)
-
- def getAdminList(self, only_identified=False):
- """ Return a list with admin nodes """
- return self.__getTypeList(AdminNode, only_identified)
-
- def getByAddress(self, address):
- """ Return the node that match with a given address """
- return self._address_dict.get(address, None)
-
- def getByUUID(self, uuid):
- """ Return the node that match with a given UUID """
- return self._uuid_dict.get(uuid, None)
-
- def hasAddress(self, address):
- return address in self._address_dict
-
- def hasUUID(self, uuid):
- return uuid in self._uuid_dict
-
- def _createNode(self, klass, **kw):
- return klass(self, **kw)
-
- def createMaster(self, **kw):
- """ Create and register a new master """
- return self._createNode(MasterNode, **kw)
-
- def createStorage(self, **kw):
- """ Create and register a new storage """
- return self._createNode(StorageNode, **kw)
-
- def createClient(self, **kw):
- """ Create and register a new client """
- return self._createNode(ClientNode, **kw)
-
- def createAdmin(self, **kw):
- """ Create and register a new admin """
- return self._createNode(AdminNode, **kw)
-
- def _getClassFromNodeType(self, node_type):
- klass = NODE_TYPE_MAPPING.get(node_type)
- if klass is None:
- raise ValueError('Unknown node type : %s' % node_type)
- return klass
-
- def createFromNodeType(self, node_type, **kw):
- return self._createNode(self._getClassFromNodeType(node_type), **kw)
-
- def init(self):
- self._node_set.clear()
- self._type_dict.clear()
- self._state_dict.clear()
- self._uuid_dict.clear()
- self._address_dict.clear()
-
- def update(self, node_list):
- for node_type, addr, uuid, state in node_list:
- # This should be done here (although klass might not be used in this
- # iteration), as it raises if type is not valid.
- klass = self._getClassFromNodeType(node_type)
-
- # lookup in current table
- node_by_uuid = self.getByUUID(uuid)
- node_by_addr = self.getByAddress(addr)
- node = node_by_uuid or node_by_addr
-
- log_args = (node_type, dump(uuid), addr, state)
- if node is None:
- if state == NodeStates.DOWN:
- neo.logging.debug('NOT creating node %s %s %s %s',
- *log_args)
- else:
- node = self._createNode(klass, address=addr, uuid=uuid,
- state=state)
- neo.logging.debug('creating node %r', node)
- else:
- assert isinstance(node, klass), 'node %r is not ' \
- 'of expected type: %r' % (node, klass)
- assert None in (node_by_uuid, node_by_addr) or \
- node_by_uuid is node_by_addr, \
- 'Discrepancy between node_by_uuid (%r) and ' \
- 'node_by_addr (%r)' % (node_by_uuid, node_by_addr)
- if state == NodeStates.DOWN:
- neo.logging.debug('droping node %r (%r), found with %s ' \
- '%s %s %s', node, node.isConnected(), *log_args)
- if node.isConnected():
- # cut this connection, node removed by handler
- node.getConnection().close()
- self.remove(node)
- else:
- neo.logging.debug('updating node %r to %s %s %s %s',
- node, *log_args)
- node.setUUID(uuid)
- node.setAddress(addr)
- node.setState(state)
- self.log()
-
- def log(self):
- neo.logging.info('Node manager : %d nodes' % len(self._node_set))
- for node in sorted(list(self._node_set)):
- uuid = dump(node.getUUID()) or '-' * 32
- address = node.getAddress() or ''
- if address:
- address = '%s:%d' % address
- neo.logging.info(' * %32s | %8s | %22s | %s' % (
- uuid, node.getType(), address, node.getState()))
-
Removed: trunk/neo/profiling.py
==============================================================================
--- trunk/neo/profiling.py [iso-8859-1] (original)
+++ trunk/neo/profiling.py (removed)
@@ -1,39 +0,0 @@
-#
-# Copyright (C) 2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-"""
-Profiling is done with tiny-profiler, a very simple profiler.
-
-It is different from python's built-in profilers in that it requires
-developpers to explicitely put probes on specific methods, reducing:
-- profiling overhead
-- undesired result entries
-
-You can get this profiler at:
- https://svn.erp5.org/repos/public/erp5/trunk/utils/tiny_profiler
-"""
-
-PROFILING_ENABLED = False
-
-if PROFILING_ENABLED:
- from tiny_profiler import profiler_decorator, profiler_report
-else:
- def profiler_decorator(func):
- return func
-
- def profiler_report():
- pass
Removed: trunk/neo/protocol.py
==============================================================================
--- trunk/neo/protocol.py [iso-8859-1] (original)
+++ trunk/neo/protocol.py (removed)
@@ -1,2074 +0,0 @@
-
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-from struct import pack, unpack, error, calcsize
-from socket import inet_ntoa, inet_aton
-from neo.profiling import profiler_decorator
-from cStringIO import StringIO
-
-from neo.util import Enum
-
-# The protocol version (major, minor).
-PROTOCOL_VERSION = (4, 1)
-
-# Size restrictions.
-MIN_PACKET_SIZE = 10
-MAX_PACKET_SIZE = 0x4000000
-PACKET_HEADER_FORMAT = '!LHL'
-PACKET_HEADER_SIZE = calcsize(PACKET_HEADER_FORMAT)
-# Check that header size is the expected value.
-# If it is not, it means that struct module result is incompatible with
-# "reference" platform (python 2.4 on x86-64).
-assert PACKET_HEADER_SIZE == 10, \
- 'Unsupported platform, packet header length = %i' % (PACKET_HEADER_SIZE, )
-RESPONSE_MASK = 0x8000
-
-class ErrorCodes(Enum):
- ACK = Enum.Item(0)
- NOT_READY = Enum.Item(1)
- OID_NOT_FOUND = Enum.Item(2)
- OID_DOES_NOT_EXIST = Enum.Item(6)
- TID_NOT_FOUND = Enum.Item(3)
- PROTOCOL_ERROR = Enum.Item(4)
- BROKEN_NODE = Enum.Item(5)
- ALREADY_PENDING = Enum.Item(7)
-ErrorCodes = ErrorCodes()
-
-class ClusterStates(Enum):
- RECOVERING = Enum.Item(1)
- VERIFYING = Enum.Item(2)
- RUNNING = Enum.Item(3)
- STOPPING = Enum.Item(4)
-ClusterStates = ClusterStates()
-
-class NodeTypes(Enum):
- MASTER = Enum.Item(1)
- STORAGE = Enum.Item(2)
- CLIENT = Enum.Item(3)
- ADMIN = Enum.Item(4)
-NodeTypes = NodeTypes()
-
-class NodeStates(Enum):
- RUNNING = Enum.Item(1)
- TEMPORARILY_DOWN = Enum.Item(2)
- DOWN = Enum.Item(3)
- BROKEN = Enum.Item(4)
- HIDDEN = Enum.Item(5)
- PENDING = Enum.Item(6)
- UNKNOWN = Enum.Item(7)
-NodeStates = NodeStates()
-
-class CellStates(Enum):
- UP_TO_DATE = Enum.Item(1)
- OUT_OF_DATE = Enum.Item(2)
- FEEDING = Enum.Item(3)
- DISCARDED = Enum.Item(4)
-CellStates = CellStates()
-
-class LockState(Enum):
- NOT_LOCKED = Enum.Item(1)
- GRANTED = Enum.Item(2)
- GRANTED_TO_OTHER = Enum.Item(3)
-LockState = LockState()
-
-# used for logging
-node_state_prefix_dict = {
- NodeStates.RUNNING: 'R',
- NodeStates.TEMPORARILY_DOWN: 'T',
- NodeStates.DOWN: 'D',
- NodeStates.BROKEN: 'B',
- NodeStates.HIDDEN: 'H',
- NodeStates.PENDING: 'P',
- NodeStates.UNKNOWN: 'U',
-}
-
-# used for logging
-cell_state_prefix_dict = {
- CellStates.UP_TO_DATE: 'U',
- CellStates.OUT_OF_DATE: 'O',
- CellStates.FEEDING: 'F',
- CellStates.DISCARDED: 'D',
-}
-
-# Other constants.
-INVALID_UUID = '\0' * 16
-INVALID_TID = '\xff' * 8
-INVALID_OID = '\xff' * 8
-INVALID_PARTITION = 0xffffffff
-ZERO_TID = '\0' * 8
-ZERO_OID = '\0' * 8
-OID_LEN = len(INVALID_OID)
-TID_LEN = len(INVALID_TID)
-
-UUID_NAMESPACES = {
- NodeTypes.STORAGE: 'S',
- NodeTypes.MASTER: 'M',
- NodeTypes.CLIENT: 'C',
- NodeTypes.ADMIN: 'A',
-}
-
-class ProtocolError(Exception):
- """ Base class for protocol errors, close the connection """
- pass
-
-class PacketMalformedError(ProtocolError):
- """ Close the connection and set the node as broken"""
- pass
-
-class UnexpectedPacketError(ProtocolError):
- """ Close the connection and set the node as broken"""
- pass
-
-class NotReadyError(ProtocolError):
- """ Just close the connection """
- pass
-
-class BrokenNodeDisallowedError(ProtocolError):
- """ Just close the connection """
- pass
-
-
-# packet parser
-def _decodeClusterState(state):
- cluster_state = ClusterStates.get(state)
- if cluster_state is None:
- raise PacketMalformedError('invalid cluster state %d' % state)
- return cluster_state
-
-def _decodeNodeState(state):
- node_state = NodeStates.get(state)
- if node_state is None:
- raise PacketMalformedError('invalid node state %d' % state)
- return node_state
-
-def _decodeNodeType(original_node_type):
- node_type = NodeTypes.get(original_node_type)
- if node_type is None:
- raise PacketMalformedError('invalid node type %d' % original_node_type)
- return node_type
-
-def _decodeErrorCode(original_error_code):
- error_code = ErrorCodes.get(original_error_code)
- if error_code is None:
- raise PacketMalformedError('invalid error code %d' %
- original_error_code)
- return error_code
-
-def _decodeLockState(original_lock_state):
- lock_state = LockState.get(original_lock_state)
- if lock_state is None:
- raise PacketMalformedError('invalid lock state %d' % (
- original_lock_state, ))
- return lock_state
-
-def _decodeAddress(address):
- if address == '\0' * 6:
- return None
- (ip, port) = unpack('!4sH', address)
- return (inet_ntoa(ip), port)
-
-def _encodeAddress(address):
- if address is None:
- return '\0' * 6
- # address is a tuple (ip, port)
- return pack('!4sH', inet_aton(address[0]), address[1])
-
-def _decodeUUID(uuid):
- if uuid == INVALID_UUID:
- return None
- return uuid
-
-def _encodeUUID(uuid):
- if uuid is None:
- return INVALID_UUID
- return uuid
-
-def _decodePTID(ptid):
- ptid = unpack('!Q', ptid)[0]
- if ptid == 0:
- return None
- return ptid
-
-def _encodePTID(ptid):
- if ptid is None:
- ptid = 0
- assert isinstance(ptid, (int, long)), ptid
- return pack('!Q', ptid)
-
-def _decodeTID(tid):
- if tid == INVALID_TID:
- return None
- return tid
-
-def _encodeTID(tid):
- if tid is None:
- return INVALID_TID
- return tid
-
-def _decodeString(buf, name, offset=0):
- buf = buf[offset:]
- (size, ) = unpack('!L', buf[:4])
- string = buf[4:4+size]
- if len(string) != size:
- raise PacketMalformedError("can't read string <%s>" % name)
- return (string, buf[offset+4+size:])
-
- at profiler_decorator
-def _encodeString(buf):
- return pack('!L', len(buf)) + buf
-
-class Packet(object):
- """
- Base class for any packet definition.
- Each subclass should override _encode() and _decode() and return a string or
- a tuple respectively.
- """
-
- _ignore_when_closed = False
- _header_format = None
- _header_len = None
- _request = None
- _answer = None
- _body = None
- _code = None
- _id = None
-
- def __init__(self, *args, **kw):
- assert self._code is not None, "Packet class not registered"
- if args != () or kw != {}:
- body = self._encode(*args, **kw)
- else:
- body = ''
- self._body = body
-
- def decode(self):
- assert self._body is not None
- try:
- return self._decode(self._body)
- except error, msg: # struct.error
- name = self.__class__.__name__
- raise PacketMalformedError("%s fail (%s)" % (name, msg))
- except PacketMalformedError, msg:
- name = self.__class__.__name__
- raise PacketMalformedError("%s fail (%s)" % (name, msg))
-
- def setContent(self, msg_id, body):
- """ Register the packet content for future decoding """
- self._id = msg_id
- self._body = body
-
- def setId(self, value):
- self._id = value
-
- def getId(self):
- assert self._id is not None, "No identifier applied on the packet"
- return self._id
-
- def getCode(self):
- return self._code
-
- def getType(self):
- return self.__class__
-
- @profiler_decorator
- def encode(self):
- """ Encode a packet as a string to send it over the network """
- content = self._body
- length = PACKET_HEADER_SIZE + len(content)
- return (pack(PACKET_HEADER_FORMAT, self._id, self._code, length),
- content)
-
- @profiler_decorator
- def __len__(self):
- return PACKET_HEADER_SIZE + len(self._body)
-
- def __repr__(self):
- return '%s[%r]' % (self.__class__.__name__, self._id)
-
- def __eq__(self, other):
- """ Compare packets with their code instead of content """
- if other is None:
- return False
- assert isinstance(other, Packet)
- return self._code == other._code
-
- def _encode(self, *args, **kw):
- """ Default encoder, join all arguments """
- args = list(args)
- args.extend(kw.values())
- return ''.join([str(i) for i in args] or '')
-
- def _decode(self, body):
- """ Default decoder, message must be empty """
- assert body == '', "Non-empty packet decoding not implemented """
- return ()
-
- def isError(self):
- return isinstance(self, Error)
-
- def isResponse(self):
- return self._code & RESPONSE_MASK == RESPONSE_MASK
-
- def getAnswerClass(self):
- return self._answer
-
- def ignoreOnClosedConnection(self):
- """
- Tells if this packet must be ignored when its connection is closed
- when it is handled.
- """
- return self._ignore_when_closed
-
-class Notify(Packet):
- """
- General purpose notification (remote logging)
- """
- def _encode(self, message):
- return message
-
- def _decode(self, body):
- return (body, )
-
-class Ping(Packet):
- """
- Check if a peer is still alive. Any -> Any.
- """
- pass
-
-class Pong(Packet):
- """
- Notify being alive. Any -> Any.
- """
- pass
-
-class RequestIdentification(Packet):
- """
- Request a node identification. This must be the first packet for any
- connection. Any -> Any.
- """
- _header_format = '!LLH16s6s'
-
- def _encode(self, node_type, uuid, address, name):
- uuid = _encodeUUID(uuid)
- address = _encodeAddress(address)
- return pack(self._header_format, PROTOCOL_VERSION[0],
- PROTOCOL_VERSION[1], node_type, uuid, address) + \
- _encodeString(name)
-
- def _decode(self, body):
- r = unpack(self._header_format, body[:self._header_len])
- major, minor, node_type, uuid, address = r
- address = _decodeAddress(address)
- (name, _) = _decodeString(body, 'name', offset=self._header_len)
- node_type = _decodeNodeType(node_type)
- uuid = _decodeUUID(uuid)
- if (major, minor) != PROTOCOL_VERSION:
- raise PacketMalformedError('protocol version mismatch')
- return (node_type, uuid, address, name)
-
-class AcceptIdentification(Packet):
- """
- Accept a node identification. This should be a reply to Request Node
- Identification. Any -> Any.
- """
- _header_format = '!H16sLL16s'
-
- def _encode(self, node_type, uuid,
- num_partitions, num_replicas, your_uuid):
- uuid = _encodeUUID(uuid)
- your_uuid = _encodeUUID(your_uuid)
- return pack(self._header_format, node_type, uuid,
- num_partitions, num_replicas, your_uuid)
-
- def _decode(self, body):
- r = unpack(self._header_format, body)
- node_type, uuid, num_partitions, num_replicas, your_uuid = r
- node_type = _decodeNodeType(node_type)
- uuid = _decodeUUID(uuid)
- your_uuid = _decodeUUID(your_uuid)
- return (node_type, uuid, num_partitions, num_replicas, your_uuid)
-
-class AskPrimary(Packet):
- """
- Ask a current primary master node. This must be the second message when
- connecting to a master node. Any -> M.
- """
- pass
-
-class AnswerPrimary(Packet):
- """
- Reply to Ask Primary Master. This message includes a list of known master
- nodes to make sure that a peer has the same information. M -> Any.
- """
- _header_format = '!16sL'
- _list_entry_format = '!6s16s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, primary_uuid, known_master_list):
- primary_uuid = _encodeUUID(primary_uuid)
- body = [pack(self._header_format, primary_uuid,
- len(known_master_list))]
- for address, uuid in known_master_list:
- uuid = _encodeUUID(uuid)
- address = _encodeAddress(address)
- body.append(pack(self._list_entry_format, address, uuid))
- return ''.join(body)
-
- def _decode(self, body):
- packet_offset = self._header_len
- (primary_uuid, n) = unpack(self._header_format,
- body[:packet_offset])
- known_master_list = []
- list_entry_len = self._list_entry_len
- list_entry_format = self._list_entry_format
- for _ in xrange(n):
- next_packet_offset = packet_offset + list_entry_len
- address, uuid = unpack(list_entry_format,
- body[packet_offset:next_packet_offset])
- packet_offset = next_packet_offset
- address = _decodeAddress(address)
- uuid = _decodeUUID(uuid)
- known_master_list.append((address, uuid))
- primary_uuid = _decodeUUID(primary_uuid)
- return (primary_uuid, known_master_list)
-
-class AnnouncePrimary(Packet):
- """
- Announce a primary master node election. PM -> SM.
- """
- pass
-
-class ReelectPrimary(Packet):
- """
- Force a re-election of a primary master node. M -> M.
- """
- pass
-
-class AskLastIDs(Packet):
- """
- Ask the last OID, the last TID and the last Partition Table ID that
- a storage node stores. Used to recover information. PM -> S, S -> PM.
- """
- pass
-
-class AnswerLastIDs(Packet):
- """
- Reply to Ask Last IDs. S -> PM, PM -> S.
- """
- def _encode(self, loid, ltid, lptid):
- # in this case, loid is a valid OID but considered as invalid. This is
- # not an issue because the OID 0 is hard coded and will never be
- # generated
- if loid is None:
- loid = INVALID_OID
- ltid = _encodeTID(ltid)
- lptid = _encodePTID(lptid)
- return loid + ltid + lptid
-
- def _decode(self, body):
- (loid, ltid, lptid) = unpack('!8s8s8s', body)
- if loid == INVALID_OID:
- loid = None
- ltid = _decodeTID(ltid)
- lptid = _decodePTID(lptid)
- return (loid, ltid, lptid)
-
-class AskPartitionTable(Packet):
- """
- Ask the full partition table. PM -> S.
- """
- pass
-
-class AnswerPartitionTable(Packet):
- """
- Answer rows in a partition table. S -> PM.
- """
- _header_format = '!8sL'
- _row_entry_format = '!LL'
- _row_entry_len = calcsize(_row_entry_format)
- _cell_entry_format = '!16sH'
- _cell_entry_len = calcsize(_cell_entry_format)
-
- def _encode(self, ptid, row_list):
- ptid = _encodePTID(ptid)
- body = [pack(self._header_format, ptid, len(row_list))]
- row_entry_format = self._row_entry_format
- cell_entry_format = self._cell_entry_format
- for offset, cell_list in row_list:
- body.append(pack(row_entry_format, offset, len(cell_list)))
- for uuid, state in cell_list:
- uuid = _encodeUUID(uuid)
- body.append(pack(cell_entry_format, uuid, state))
- return ''.join(body)
-
- def _decode(self, body):
- index = self._header_len
- (ptid, n) = unpack(self._header_format, body[:index])
- ptid = _decodePTID(ptid)
- row_list = []
- cell_list = []
- row_entry_format = self._row_entry_format
- row_entry_len = self._row_entry_len
- cell_entry_format = self._cell_entry_format
- cell_entry_len = self._cell_entry_len
- for _ in xrange(n):
- next_index = index + row_entry_len
- offset, m = unpack(row_entry_format, body[index:next_index])
- index = next_index
- for _ in xrange(m):
- next_index = index + cell_entry_len
- uuid, state = unpack(cell_entry_format, body[index:next_index])
- index = next_index
- state = CellStates.get(state)
- uuid = _decodeUUID(uuid)
- cell_list.append((uuid, state))
- row_list.append((offset, tuple(cell_list)))
- del cell_list[:]
- return (ptid, row_list)
-
-class SendPartitionTable(Packet):
- """
- Send rows in a partition table to update other nodes. PM -> S, C.
- """
- _header_format = '!8sL'
- _row_entry_format = '!LL'
- _row_entry_len = calcsize(_row_entry_format)
- _cell_entry_format = '!16sH'
- _cell_entry_len = calcsize(_cell_entry_format)
-
- def _encode(self, ptid, row_list):
- ptid = _encodePTID(ptid)
- body = [pack(self._header_format, ptid, len(row_list))]
- row_entry_format = self._row_entry_format
- cell_entry_format = self._cell_entry_format
- for offset, cell_list in row_list:
- body.append(pack(row_entry_format, offset, len(cell_list)))
- for uuid, state in cell_list:
- uuid = _encodeUUID(uuid)
- body.append(pack(cell_entry_format, uuid, state))
- return ''.join(body)
-
- def _decode(self, body):
- index = self._header_len
- (ptid, n,) = unpack(self._header_format, body[:index])
- ptid = _decodePTID(ptid)
- row_list = []
- cell_list = []
- row_entry_format = self._row_entry_format
- row_entry_len = self._row_entry_len
- cell_entry_format = self._cell_entry_format
- cell_entry_len = self._cell_entry_len
- for _ in xrange(n):
- next_index = index + row_entry_len
- offset, m = unpack(row_entry_format, body[index:next_index])
- index = next_index
- for _ in xrange(m):
- next_index = index + cell_entry_len
- uuid, state = unpack(cell_entry_format, body[index:next_index])
- index = next_index
- state = CellStates.get(state)
- uuid = _decodeUUID(uuid)
- cell_list.append((uuid, state))
- row_list.append((offset, tuple(cell_list)))
- del cell_list[:]
- return (ptid, row_list)
-
-class NotifyPartitionChanges(Packet):
- """
- Notify a subset of a partition table. This is used to notify changes.
- PM -> S, C.
- """
- _header_format = '!8sL'
- _list_entry_format = '!L16sH'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, ptid, cell_list):
- ptid = _encodePTID(ptid)
- body = [pack(self._header_format, ptid, len(cell_list))]
- list_entry_format = self._list_entry_format
- for offset, uuid, state in cell_list:
- uuid = _encodeUUID(uuid)
- body.append(pack(list_entry_format, offset, uuid, state))
- return ''.join(body)
-
- def _decode(self, body):
- packet_offset = self._header_len
- (ptid, n) = unpack(self._header_format, body[:packet_offset])
- ptid = _decodePTID(ptid)
- cell_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_packet_offset = packet_offset + list_entry_len
- (offset, uuid, state) = unpack(list_entry_format,
- body[packet_offset:next_packet_offset])
- packet_offset = next_packet_offset
- state = CellStates.get(state)
- uuid = _decodeUUID(uuid)
- cell_list.append((offset, uuid, state))
- return (ptid, cell_list)
-
-class NotifyReplicationDone(Packet):
- """
- Notify the master node that a partition has been successully replicated from
- a storage to another.
- S -> M
- """
- _header_format = '!L'
-
- def _encode(self, offset):
- return pack(self._header_format, offset)
-
- def _decode(self, body):
- (offset, ) = unpack(self._header_format, body)
- return (offset, )
-
-class StartOperation(Packet):
- """
- Tell a storage nodes to start an operation. Until a storage node receives
- this message, it must not serve client nodes. PM -> S.
- """
- pass
-
-class StopOperation(Packet):
- """
- Tell a storage node to stop an operation. Once a storage node receives
- this message, it must not serve client nodes. PM -> S.
- """
- pass
-
-class AskUnfinishedTransactions(Packet):
- """
- Ask unfinished transactions PM -> S.
- """
- pass
-
-class AnswerUnfinishedTransactions(Packet):
- """
- Answer unfinished transactions S -> PM.
- """
- _header_format = '!L'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, tid_list):
- body = [pack(self._header_format, len(tid_list))]
- body.extend(tid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (n,) = unpack(self._header_format, body[:offset])
- tid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- tid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- tid_list.append(tid)
- return (tid_list,)
-
-class AskObjectPresent(Packet):
- """
- Ask if an object is present. If not present, OID_NOT_FOUND should be
- returned. PM -> S.
- """
- def _decode(self, body):
- (oid, tid) = unpack('8s8s', body)
- return (oid, _decodeTID(tid))
-
-class AnswerObjectPresent(Packet):
- """
- Answer that an object is present. PM -> S.
- """
- def _decode(self, body):
- (oid, tid) = unpack('8s8s', body)
- return (oid, _decodeTID(tid))
-
-class DeleteTransaction(Packet):
- """
- Delete a transaction. PM -> S.
- """
- _header_format = '!8sL'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, tid, oid_list):
- body = [pack(self._header_format, tid, len(oid_list))]
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (tid, n) = unpack(self._header_format, body[:offset])
- oid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- oid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- oid_list.append(oid)
- return (tid, oid_list)
-
-class CommitTransaction(Packet):
- """
- Commit a transaction. PM -> S.
- """
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (_decodeTID(tid), )
-
-class AskBeginTransaction(Packet):
- """
- Ask to begin a new transaction. C -> PM.
- """
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- return (_decodeTID(unpack('8s', body)[0]), )
-
-class AnswerBeginTransaction(Packet):
- """
- Answer when a transaction begin, give a TID if necessary. PM -> C.
- """
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (tid, )
-
-class AskFinishTransaction(Packet):
- """
- Finish a transaction. C -> PM.
- """
- _header_format = '!8sL'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, tid, oid_list):
- body = [pack(self._header_format, tid, len(oid_list))]
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (tid, n) = unpack(self._header_format, body[:offset])
- oid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- oid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- oid_list.append(oid)
- return (tid, oid_list)
-
-class AnswerTransactionFinished(Packet):
- """
- Answer when a transaction is finished. PM -> C.
- """
- def _encode(self, ttid, tid):
- return _encodeTID(ttid) + _encodeTID(tid)
-
- def _decode(self, body):
- (ttid, tid) = unpack('8s8s', body)
- return (_decodeTID(ttid), _decodeTID(tid))
-
-class AskLockInformation(Packet):
- """
- Lock information on a transaction. PM -> S.
- """
- # XXX: Identical to InvalidateObjects and AskFinishTransaction
- _header_format = '!8s8sL'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, ttid, tid, oid_list):
- body = [pack(self._header_format, ttid, tid, len(oid_list))]
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (ttid, tid, n) = unpack(self._header_format, body[:offset])
- oid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- oid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- oid_list.append(oid)
- return (ttid, tid, oid_list)
-
-class AnswerInformationLocked(Packet):
- """
- Notify information on a transaction locked. S -> PM.
- """
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (_decodeTID(tid), )
-
-class InvalidateObjects(Packet):
- """
- Invalidate objects. PM -> C.
- """
- _header_format = '!8sL'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, tid, oid_list):
- body = [pack(self._header_format, tid, len(oid_list))]
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (tid, n) = unpack(self._header_format, body[:offset])
- oid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- oid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- oid_list.append(oid)
- return (tid, oid_list)
-
-class NotifyUnlockInformation(Packet):
- """
- Unlock information on a transaction. PM -> S.
- """
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (_decodeTID(tid), )
-
-class AskNewOIDs(Packet):
- """
- Ask new object IDs. C -> PM.
- """
- _header_format = '!H'
-
- def _encode(self, num_oids):
- return pack(self._header_format, num_oids)
-
- def _decode(self, body):
- return unpack(self._header_format, body) # num oids
-
-class AnswerNewOIDs(Packet):
- """
- Answer new object IDs. PM -> C.
- """
- _header_format = '!H'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, oid_list):
- body = [pack(self._header_format, len(oid_list))]
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (n,) = unpack(self._header_format, body[:offset])
- oid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- oid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- oid_list.append(oid)
- return (oid_list,)
-
-class AskStoreObject(Packet):
- """
- Ask to store an object. Send an OID, an original serial, a current
- transaction ID, and data. C -> S.
- """
- _header_format = '!8s8s8sBL8sB'
-
- @profiler_decorator
- def _encode(self, oid, serial, compression, checksum, data, data_serial,
- tid, unlock):
- if serial is None:
- serial = INVALID_TID
- if data_serial is None:
- data_serial = INVALID_TID
- unlock = unlock and 1 or 0
- return pack(self._header_format, oid, serial, tid, compression,
- checksum, data_serial, unlock) + _encodeString(data)
-
- def _decode(self, body):
- header_len = self._header_len
- r = unpack(self._header_format, body[:header_len])
- oid, serial, tid, compression, checksum, data_serial, unlock = r
- serial = _decodeTID(serial)
- data_serial = _decodeTID(data_serial)
- (data, _) = _decodeString(body, 'data', offset=header_len)
- return (oid, serial, compression, checksum, data, data_serial, tid,
- bool(unlock))
-
-class AnswerStoreObject(Packet):
- """
- Answer if an object has been stored. If an object is in conflict,
- a serial of the conflicting transaction is returned. In this case,
- if this serial is newer than the current transaction ID, a client
- node must not try to resolve the conflict. S -> C.
- """
- _header_format = '!B8s8s'
-
- def _encode(self, conflicting, oid, serial):
- if serial is None:
- serial = INVALID_TID
- return pack(self._header_format, conflicting, oid, serial)
-
- def _decode(self, body):
- (conflicting, oid, serial) = unpack(self._header_format, body)
- return (conflicting, oid, serial)
-
-class AbortTransaction(Packet):
- """
- Abort a transaction. C -> S, PM.
- """
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (tid, )
-
-class AskStoreTransaction(Packet):
- """
- Ask to store a transaction. C -> S.
- """
- _header_format = '!8sLHHH'
-
- def _encode(self, tid, user, desc, ext, oid_list):
- lengths = (len(oid_list), len(user), len(desc), len(ext))
- body = [pack(self._header_format, tid, *lengths)]
- body.append(user)
- body.append(desc)
- body.append(ext)
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- r = unpack(self._header_format, body[:self._header_len])
- tid, oid_len, user_len, desc_len, ext_len = r
- body = body[self._header_len:]
- user = body[:user_len]
- body = body[user_len:]
- desc = body[:desc_len]
- body = body[desc_len:]
- ext = body[:ext_len]
- body = body[ext_len:]
- oid_list = []
- for _ in xrange(oid_len):
- (oid, ) = unpack('8s', body[:8])
- body = body[8:]
- oid_list.append(oid)
- return (tid, user, desc, ext, oid_list)
-
-class AnswerStoreTransaction(Packet):
- """
- Answer if transaction has been stored. S -> C.
- """
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (tid, )
-
-class AskObject(Packet):
- """
- Ask a stored object by its OID and a serial or a TID if given. If a serial
- is specified, the specified revision of an object will be returned. If
- a TID is specified, an object right before the TID will be returned. S,C -> S.
- """
- _header_format = '!8s8s8s'
-
- def _encode(self, oid, serial, tid):
- tid = _encodeTID(tid)
- serial = _encodeTID(serial) # serial is the previous TID
- return pack(self._header_format, oid, serial, tid)
-
- def _decode(self, body):
- (oid, serial, tid) = unpack(self._header_format, body)
- if serial == INVALID_TID:
- serial = None
- tid = _decodeTID(tid)
- return (oid, serial, tid)
-
-class AnswerObject(Packet):
- """
- Answer the requested object. S -> C.
- """
- _header_format = '!8s8s8s8sBL'
-
- def _encode(self, oid, serial_start, serial_end, compression,
- checksum, data, data_serial):
- if serial_start is None:
- serial_start = INVALID_TID
- if serial_end is None:
- serial_end = INVALID_TID
- if data_serial is None:
- data_serial = INVALID_TID
- return pack(self._header_format, oid, serial_start, serial_end,
- data_serial, compression, checksum) + _encodeString(data)
-
- def _decode(self, body):
- header_len = self._header_len
- r = unpack(self._header_format, body[:header_len])
- oid, serial_start, serial_end, data_serial, compression, checksum = r
- if serial_end == INVALID_TID:
- serial_end = None
- if data_serial == INVALID_TID:
- data_serial = None
- (data, _) = _decodeString(body, 'data', offset=header_len)
- return (oid, serial_start, serial_end, compression, checksum, data,
- data_serial)
-
-class AskTIDs(Packet):
- """
- Ask for TIDs between a range of offsets. The order of TIDs is descending,
- and the range is [first, last). C -> S.
- """
- _header_format = '!QQL'
-
- def _encode(self, first, last, partition):
- return pack(self._header_format, first, last, partition)
-
- def _decode(self, body):
- return unpack(self._header_format, body) # first, last, partition
-
-class AnswerTIDs(Packet):
- """
- Answer the requested TIDs. S -> C.
- """
- _header_format = '!L'
- _list_entry_format = '8s'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, tid_list):
- body = [pack(self._header_format, len(tid_list))]
- body.extend(tid_list)
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (n, ) = unpack(self._header_format, body[:offset])
- tid_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- tid = unpack(list_entry_format, body[offset:next_offset])[0]
- offset = next_offset
- tid_list.append(tid)
- return (tid_list,)
-
-class AskTIDsFrom(Packet):
- """
- Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
- S -> S.
- """
- _header_format = '!8s8sLL'
- _list_entry_format = 'L'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, min_tid, max_tid, length, partition_list):
- body = [pack(self._header_format, min_tid, max_tid, length,
- len(partition_list))]
- list_entry_format = self._list_entry_format
- for partition in partition_list:
- body.append(pack(list_entry_format, partition))
- return ''.join(body)
-
- def _decode(self, body):
- body = StringIO(body)
- read = body.read
- header = unpack(self._header_format, read(self._header_len))
- min_tid, max_tid, length, list_length = header
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- partition_list = []
- for _ in xrange(list_length):
- partition = unpack(list_entry_format, read(list_entry_len))[0]
- partition_list.append(partition)
- return (min_tid, max_tid, length, partition_list)
-
-class AnswerTIDsFrom(AnswerTIDs):
- """
- Answer the requested TIDs. S -> S
- """
- pass
-
-class AskTransactionInformation(Packet):
- """
- Ask information about a transaction. Any -> S.
- """
- def _decode(self, body):
- (tid, ) = unpack('8s', body)
- return (tid, )
-
-class AnswerTransactionInformation(Packet):
- """
- Answer information (user, description) about a transaction. S -> Any.
- """
- _header_format = '!8sHHHBL'
-
- def _encode(self, tid, user, desc, ext, packed, oid_list):
- packed = packed and 1 or 0
- body = [pack(self._header_format, tid, len(user), len(desc), len(ext),
- packed, len(oid_list))]
- body.append(user)
- body.append(desc)
- body.append(ext)
- body.extend(oid_list)
- return ''.join(body)
-
- def _decode(self, body):
- r = unpack(self._header_format, body[:self._header_len])
- tid, user_len, desc_len, ext_len, packed, oid_len = r
- packed = bool(packed)
- body = body[self._header_len:]
- user = body[:user_len]
- body = body[user_len:]
- desc = body[:desc_len]
- body = body[desc_len:]
- ext = body[:ext_len]
- body = body[ext_len:]
- oid_list = []
- for _ in xrange(oid_len):
- (oid, ) = unpack('8s', body[:8])
- body = body[8:]
- oid_list.append(oid)
- return (tid, user, desc, ext, packed, oid_list)
-
-class AskObjectHistory(Packet):
- """
- Ask history information for a given object. The order of serials is
- descending, and the range is [first, last]. C -> S.
- """
- _header_format = '!8sQQ'
-
- def _encode(self, oid, first, last):
- return pack(self._header_format, oid, first, last)
-
- def _decode(self, body):
- (oid, first, last) = unpack(self._header_format, body)
- return (oid, first, last)
-
-class AnswerObjectHistory(Packet):
- """
- Answer history information (serial, size) for an object. S -> C.
- """
- _header_format = '!8sL'
- _list_entry_format = '!8sL'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, oid, history_list):
- body = [pack(self._header_format, oid, len(history_list))]
- list_entry_format = self._list_entry_format
- for serial, size in history_list:
- body.append(pack(list_entry_format, serial, size))
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (oid, length) = unpack(self._header_format, body[:offset])
- history_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(length):
- next_offset = offset + list_entry_len
- serial, size = unpack(list_entry_format, body[offset:next_offset])
- offset = next_offset
- history_list.append((serial, size))
- return (oid, history_list)
-
-class AskObjectHistoryFrom(Packet):
- """
- Ask history information for a given object. The order of serials is
- ascending, and starts at (or above) min_serial for min_oid. S -> S.
- """
- _header_format = '!8s8s8sLL'
-
- def _encode(self, min_oid, min_serial, max_serial, length, partition):
- return pack(self._header_format, min_oid, min_serial, max_serial,
- length, partition)
-
- def _decode(self, body):
- # min_oid, min_serial, length, partition
- return unpack(self._header_format, body)
-
-class AnswerObjectHistoryFrom(Packet):
- """
- Answer the requested serials. S -> S.
- """
- _header_format = '!L'
- _list_entry_format = '!8sL'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, object_dict):
- body = [pack(self._header_format, len(object_dict))]
- append = body.append
- extend = body.extend
- list_entry_format = self._list_entry_format
- for oid, serial_list in object_dict.iteritems():
- append(pack(list_entry_format, oid, len(serial_list)))
- extend(serial_list)
- return ''.join(body)
-
- def _decode(self, body):
- body = StringIO(body)
- read = body.read
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- object_dict = {}
- dict_len = unpack(self._header_format, read(self._header_len))[0]
- for _ in xrange(dict_len):
- oid, serial_len = unpack(list_entry_format, read(list_entry_len))
- object_dict[oid] = [read(TID_LEN) for _ in xrange(serial_len)]
- return (object_dict, )
-
-class AskPartitionList(Packet):
- """
- All the following messages are for neoctl to admin node
- Ask information about partition
- """
- _header_format = '!LL16s'
-
- def _encode(self, min_offset, max_offset, uuid):
- uuid = _encodeUUID(uuid)
- body = [pack(self._header_format, min_offset, max_offset, uuid)]
- return ''.join(body)
-
- def _decode(self, body):
- (min_offset, max_offset, uuid) = unpack(self._header_format, body)
- uuid = _decodeUUID(uuid)
- return (min_offset, max_offset, uuid)
-
-class AnswerPartitionList(Packet):
- """
- Answer information about partition
- """
- _header_format = '!8sL'
- _row_entry_format = '!LL'
- _row_entry_len = calcsize(_row_entry_format)
- _cell_entry_format = '!16sH'
- _cell_entry_len = calcsize(_cell_entry_format)
-
- def _encode(self, ptid, row_list):
- ptid = _encodePTID(ptid)
- body = [pack(self._header_format, ptid, len(row_list))]
- row_entry_format = self._row_entry_format
- cell_entry_format = self._cell_entry_format
- for offset, cell_list in row_list:
- body.append(pack(row_entry_format, offset, len(cell_list)))
- for uuid, state in cell_list:
- uuid = _encodeUUID(uuid)
- body.append(pack(cell_entry_format, uuid, state))
- return ''.join(body)
-
- def _decode(self, body):
- index = self._header_len
- (ptid, n) = unpack(self._header_format, body[:index])
- ptid = _decodePTID(ptid)
- row_list = []
- cell_list = []
- row_entry_format = self._row_entry_format
- row_entry_len = self._row_entry_len
- cell_entry_format = self._cell_entry_format
- cell_entry_len = self._cell_entry_len
- for _ in xrange(n):
- next_index = index + row_entry_len
- offset, m = unpack(row_entry_format, body[index:next_index])
- index = next_index
- for _ in xrange(m):
- next_index = index + cell_entry_len
- uuid, state = unpack(cell_entry_format, body[index:next_index])
- index = next_index
- state = CellStates.get(state)
- uuid = _decodeUUID(uuid)
- cell_list.append((uuid, state))
- row_list.append((offset, tuple(cell_list)))
- del cell_list[:]
- return (ptid, row_list)
-
-class AskNodeList(Packet):
- """
- Ask information about nodes
- """
- _header_format = '!H'
-
- def _encode(self, node_type):
- return ''.join([pack(self._header_format, node_type)])
-
- def _decode(self, body):
- (node_type, ) = unpack(self._header_format, body)
- node_type = _decodeNodeType(node_type)
- return (node_type,)
-
-class AnswerNodeList(Packet):
- """
- Answer information about nodes
- """
- _header_format = '!L'
- _list_entry_format = '!H6s16sH'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, node_list):
- body = [pack(self._header_format, len(node_list))]
- list_entry_format = self._list_entry_format
- for node_type, address, uuid, state in node_list:
- uuid = _encodeUUID(uuid)
- address = _encodeAddress(address)
- body.append(pack(list_entry_format, node_type, address, uuid,
- state))
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (n,) = unpack(self._header_format, body[:offset])
- node_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- r = unpack(list_entry_format, body[offset:next_offset])
- offset = next_offset
- node_type, address, uuid, state = r
- address = _decodeAddress(address)
- node_type = _decodeNodeType(node_type)
- state = _decodeNodeState(state)
- uuid = _decodeUUID(uuid)
- node_list.append((node_type, address, uuid, state))
- return (node_list,)
-
-class SetNodeState(Packet):
- """
- Set the node state
- """
- _header_format = '!16sHB'
-
- def _encode(self, uuid, state, modify_partition_table):
- uuid = _encodeUUID(uuid)
- return ''.join([pack(self._header_format, uuid, state,
- modify_partition_table)])
-
- def _decode(self, body):
- (uuid, state, modify) = unpack(self._header_format, body)
- state = _decodeNodeState(state)
- uuid = _decodeUUID(uuid)
- return (uuid, state, modify)
-
-class AddPendingNodes(Packet):
- """
- Ask the primary to include some pending node in the partition table
- """
- _header_format = '!H'
- _list_header_format = '!16s'
- _list_header_len = calcsize(_list_header_format)
-
- def _encode(self, uuid_list=()):
- list_header_format = self._list_header_format
- # an empty list means all current pending nodes
- uuid_list = [pack(list_header_format, _encodeUUID(uuid)) \
- for uuid in uuid_list]
- return pack(self._header_format, len(uuid_list)) + ''.join(uuid_list)
-
- def _decode(self, body):
- header_len = self._header_len
- (n, ) = unpack(self._header_format, body[:header_len])
- list_header_format = self._list_header_format
- list_header_len = self._list_header_len
- uuid_list = [unpack(list_header_format,
- body[header_len+i*list_header_len:\
- header_len+(i+1)*list_header_len])[0] for i in xrange(n)]
- uuid_list = [_decodeUUID(x) for x in uuid_list]
- return (uuid_list, )
-
-class NotifyNodeInformation(Packet):
- """
- Notify information about one or more nodes. PM -> Any.
- """
- _header_format = '!L'
- _list_entry_format = '!H6s16sH'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, node_list):
- body = [pack(self._header_format, len(node_list))]
- list_entry_format = self._list_entry_format
- for node_type, address, uuid, state in node_list:
- uuid = _encodeUUID(uuid)
- address = _encodeAddress(address)
- body.append(pack(list_entry_format, node_type, address, uuid,
- state))
- return ''.join(body)
-
- def _decode(self, body):
- offset = self._header_len
- (n,) = unpack(self._header_format, body[:offset])
- node_list = []
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- for _ in xrange(n):
- next_offset = offset + list_entry_len
- r = unpack(list_entry_format, body[offset:next_offset])
- offset = next_offset
- node_type, address, uuid, state = r
- address = _decodeAddress(address)
- node_type = _decodeNodeType(node_type)
- state = _decodeNodeState(state)
- uuid = _decodeUUID(uuid)
- node_list.append((node_type, address, uuid, state))
- return (node_list,)
-
-class AskNodeInformation(Packet):
- """
- Ask node information
- """
- pass
-
-class AnswerNodeInformation(Packet):
- """
- Answer node information
- """
- pass
-
-class SetClusterState(Packet):
- """
- Set the cluster state
- """
- _header_format = '!H'
-
- def _encode(self, state):
- return pack(self._header_format, state)
-
- def _decode(self, body):
- (state, ) = unpack(self._header_format, body[:self._header_len])
- state = _decodeClusterState(state)
- return (state, )
-
-class NotifyClusterInformation(Packet):
- """
- Notify information about the cluster
- """
- _header_format = '!H'
-
- def _encode(self, state):
- return pack(self._header_format, state)
-
- def _decode(self, body):
- (state, ) = unpack(self._header_format, body)
- state = _decodeClusterState(state)
- return (state, )
-
-class AskClusterState(Packet):
- """
- Ask state of the cluster
- """
- pass
-
-class AnswerClusterState(Packet):
- """
- Answer state of the cluster
- """
- _header_format = '!H'
-
- def _encode(self, state):
- return pack(self._header_format, state)
-
- def _decode(self, body):
- (state, ) = unpack(self._header_format, body)
- state = _decodeClusterState(state)
- return (state, )
-
-class NotifyLastOID(Packet):
- """
- Notify last OID generated
- """
- def _decode(self, body):
- (loid, ) = unpack('8s', body)
- return (loid, )
-
-class AskObjectUndoSerial(Packet):
- """
- Ask storage the serial where object data is when undoing given transaction,
- for a list of OIDs.
- C -> S
- """
- _header_format = '!8s8s8sL'
-
- def _encode(self, tid, ltid, undone_tid, oid_list):
- body = StringIO()
- write = body.write
- write(pack(self._header_format, tid, ltid, undone_tid, len(oid_list)))
- for oid in oid_list:
- write(oid)
- return body.getvalue()
-
- def _decode(self, body):
- body = StringIO(body)
- read = body.read
- tid, ltid, undone_tid, oid_list_len = unpack(self._header_format,
- read(self._header_len))
- oid_list = [read(8) for _ in xrange(oid_list_len)]
- return tid, ltid, undone_tid, oid_list
-
-class AnswerObjectUndoSerial(Packet):
- """
- Answer serials at which object data is when undoing a given transaction.
- object_tid_dict has the following format:
- key: oid
- value: 3-tuple
- current_serial (TID)
- The latest serial visible to the undoing transaction.
- undo_serial (TID)
- Where undone data is (tid at which data is before given undo).
- is_current (bool)
- If current_serial's data is current on storage.
- S -> C
- """
- _header_format = '!L'
- _list_entry_format = '!8s8s8sB'
- _list_entry_len = calcsize(_list_entry_format)
-
- def _encode(self, object_tid_dict):
- body = StringIO()
- write = body.write
- write(pack(self._header_format, len(object_tid_dict)))
- list_entry_format = self._list_entry_format
- for oid, (current_serial, undo_serial, is_current) in \
- object_tid_dict.iteritems():
- if undo_serial is None:
- undo_serial = ZERO_TID
- write(pack(list_entry_format, oid, current_serial, undo_serial,
- is_current))
- return body.getvalue()
-
- def _decode(self, body):
- body = StringIO(body)
- read = body.read
- object_tid_dict = {}
- list_entry_format = self._list_entry_format
- list_entry_len = self._list_entry_len
- object_tid_len = unpack(self._header_format, read(self._header_len))[0]
- for _ in xrange(object_tid_len):
- oid, current_serial, undo_serial, is_current = unpack(
- list_entry_format, read(list_entry_len))
- if undo_serial == ZERO_TID:
- undo_serial = None
- object_tid_dict[oid] = (current_serial, undo_serial,
- bool(is_current))
- return (object_tid_dict, )
-
-class AskHasLock(Packet):
- """
- Ask a storage is oid is locked by another transaction.
- C -> S
- """
- def _encode(self, tid, oid):
- return _encodeTID(tid) + _encodeTID(oid)
-
- def _decode(self, body):
- return (_decodeTID(body[:8]), _decodeTID(body[8:]))
-
-class AnswerHasLock(Packet):
- """
- Answer whether a transaction holds the write lock for requested object.
- """
- _header_format = '!8sH'
-
- def _encode(self, oid, state):
- return pack(self._header_format, oid, state)
-
- def _decode(self, body):
- oid, state = unpack(self._header_format, body)
- return (oid, _decodeLockState(state))
-
-class AskCheckCurrentSerial(Packet):
- """
- Verifies if given serial is current for object oid in the database, and
- take a write lock on it (so that this state is not altered until
- transaction ends).
- """
- _header_format = '!8s8s8s'
-
- def _encode(self, tid, serial, oid):
- return tid + serial + oid
-
- def _decode(self, body):
- return unpack(self._header_format, body)
-
-class AnswerCheckCurrentSerial(AnswerStoreObject):
- """
- Answer to AskCheckCurrentSerial.
- Same structure as AnswerStoreObject, to handle the same way, except there
- is nothing to invalidate in any client's cache.
- """
- pass
-
-class AskBarrier(Packet):
- """
- Initates a "network barrier", allowing the node sending this packet to know
- when all packets sent previously on the same connection have been handled
- by its peer.
- """
- pass
-
-class AnswerBarrier(Packet):
- pass
-
-class AskPack(Packet):
- """
- Request a pack at given TID.
- C -> M
- M -> S
- """
- def _encode(self, tid):
- return _encodeTID(tid)
-
- def _decode(self, body):
- return (_decodeTID(body), )
-
-class AnswerPack(Packet):
- """
- Inform that packing it over.
- S -> M
- M -> C
- """
- _header_format = '!H'
-
- def _encode(self, status):
- return pack(self._header_format, int(status))
-
- def _decode(self, body):
- return (bool(unpack(self._header_format, body)[0]), )
-
-class AskCheckTIDRange(Packet):
- """
- Ask some stats about a range of transactions.
- Used to know if there are differences between a replicating node and
- reference node.
- S -> S
- """
- _header_format = '!8s8sLL'
-
- def _encode(self, min_tid, max_tid, length, partition):
- return pack(self._header_format, min_tid, max_tid, length, partition)
-
- def _decode(self, body):
- # min_tid, max_tid, length, partition
- return unpack(self._header_format, body)
-
-class AnswerCheckTIDRange(Packet):
- """
- Stats about a range of transactions.
- Used to know if there are differences between a replicating node and
- reference node.
- S -> S
- """
- _header_format = '!8sLLQ8s'
- def _encode(self, min_tid, length, count, tid_checksum, max_tid):
- return pack(self._header_format, min_tid, length, count, tid_checksum,
- max_tid)
-
- def _decode(self, body):
- # min_tid, length, partition, count, tid_checksum, max_tid
- return unpack(self._header_format, body)
-
-class AskCheckSerialRange(Packet):
- """
- Ask some stats about a range of object history.
- Used to know if there are differences between a replicating node and
- reference node.
- S -> S
- """
- _header_format = '!8s8s8sLL'
-
- def _encode(self, min_oid, min_serial, max_tid, length, partition):
- return pack(self._header_format, min_oid, min_serial, max_tid, length,
- partition)
-
- def _decode(self, body):
- # min_oid, min_serial, max_tid, length, partition
- return unpack(self._header_format, body)
-
-class AnswerCheckSerialRange(Packet):
- """
- Stats about a range of object history.
- Used to know if there are differences between a replicating node and
- reference node.
- S -> S
- """
- _header_format = '!8s8sLLQ8sQ8s'
-
- def _encode(self, min_oid, min_serial, length, count, oid_checksum,
- max_oid, serial_checksum, max_serial):
- return pack(self._header_format, min_oid, min_serial, length, count,
- oid_checksum, max_oid, serial_checksum, max_serial)
-
- def _decode(self, body):
- # min_oid, min_serial, length, count, oid_checksum, max_oid,
- # serial_checksum, max_serial
- return unpack(self._header_format, body)
-
-class AskLastTransaction(Packet):
- """
- Ask last committed TID.
- C -> M
- """
- pass
-
-class AnswerLastTransaction(Packet):
- """
- Answer last committed TID.
- M -> C
- """
- def _encode(self, tid):
- return tid
-
- def _decode(self, body):
- return (body, )
-
-class NotifyReady(Packet):
- """
- Notify that node is ready to serve requests.
- S -> M
- """
- pass
-
-class Error(Packet):
- """
- Error is a special type of message, because this can be sent against
- any other message, even if such a message does not expect a reply
- usually. Any -> Any.
- """
- _header_format = '!H'
-
- def _encode(self, code, message):
- return pack(self._header_format, code) + _encodeString(message)
-
- def _decode(self, body):
- offset = self._header_len
- (code, ) = unpack(self._header_format, body[:offset])
- code = _decodeErrorCode(code)
- (message, _) = _decodeString(body, 'message', offset=offset)
- return (code, message)
-
-
-def initMessage(klass):
- if klass._header_format is not None:
- klass._header_len = calcsize(klass._header_format)
-
-StaticRegistry = {}
-def register(code, request, answer=None, ignore_when_closed=None):
- """ Register a packet in the packet registry """
- # register the request
- # assert code & RESPONSE_MASK == 0
- assert code not in StaticRegistry, "Duplicate request packet code"
- initMessage(request)
- request._code = code
- request._answer = answer
- StaticRegistry[code] = request
- if ignore_when_closed is None:
- # By default, on a closed connection:
- # - request: ignore
- # - answer: keep
- # - nofitication: keep
- ignore_when_closed = answer is not None
- request._ignore_when_closed = ignore_when_closed
- if answer not in (None, Error):
- initMessage(answer)
- # compute the answer code
- code = code | RESPONSE_MASK
- answer._request = request
- answer._code = code
- # and register the answer packet
- assert code not in StaticRegistry, "Duplicate response packet code"
- StaticRegistry[code] = answer
- return (request, answer)
- return request
-
-class ParserState(object):
- """
- Parser internal state.
- To be considered opaque datatype outside of PacketRegistry.parse .
- """
- payload = None
-
- def set(self, payload):
- self.payload = payload
-
- def get(self):
- return self.payload
-
- def clear(self):
- self.payload = None
-
-class PacketRegistry(dict):
- """
- Packet registry that check packet code unicity and provide an index
- """
-
- def __init__(self):
- dict.__init__(self)
- # load packet classes
- self.update(StaticRegistry)
-
- def parse(self, buf, state_container):
- state = state_container.get()
- if state is None:
- header = buf.read(PACKET_HEADER_SIZE)
- if header is None:
- return None
- msg_id, msg_type, msg_len = unpack(PACKET_HEADER_FORMAT, header)
- try:
- packet_klass = self[msg_type]
- except KeyError:
- raise PacketMalformedError('Unknown packet type')
- if msg_len > MAX_PACKET_SIZE:
- raise PacketMalformedError('message too big (%d)' % msg_len)
- if msg_len < MIN_PACKET_SIZE:
- raise PacketMalformedError('message too small (%d)' % msg_len)
- msg_len -= PACKET_HEADER_SIZE
- else:
- msg_id, packet_klass, msg_len = state
- data = buf.read(msg_len)
- if data is None:
- # Not enough.
- if state is None:
- state_container.set((msg_id, packet_klass, msg_len))
- return None
- if state:
- state_container.clear()
- packet = packet_klass()
- packet.setContent(msg_id, data)
- return packet
-
- # packets registration
- Error = register(0x8000, Error)
- Notify = register(0x0032, Notify)
- Ping, Pong = register(
- 0x0001,
- Ping,
- Pong)
- RequestIdentification, AcceptIdentification = register(
- 0x0002,
- RequestIdentification,
- AcceptIdentification)
- AskPrimary, AnswerPrimary = register(
- 0x0003,
- AskPrimary,
- AnswerPrimary)
- AnnouncePrimary = register(0x0004, AnnouncePrimary)
- ReelectPrimary = register(0x0005, ReelectPrimary)
- NotifyNodeInformation = register(0x0006, NotifyNodeInformation)
- AskLastIDs, AnswerLastIDs = register(
- 0x0007,
- AskLastIDs,
- AnswerLastIDs)
- AskPartitionTable, AnswerPartitionTable = register(
- 0x0008,
- AskPartitionTable,
- AnswerPartitionTable)
- SendPartitionTable = register(0x0009, SendPartitionTable)
- NotifyPartitionChanges = register(0x000A, NotifyPartitionChanges)
- StartOperation = register(0x000B, StartOperation)
- StopOperation = register(0x000C, StopOperation)
- AskUnfinishedTransactions, AnswerUnfinishedTransactions = register(
- 0x000D,
- AskUnfinishedTransactions,
- AnswerUnfinishedTransactions)
- AskObjectPresent, AnswerObjectPresent = register(
- 0x000f,
- AskObjectPresent,
- AnswerObjectPresent)
- DeleteTransaction = register(0x0010, DeleteTransaction)
- CommitTransaction = register(0x0011, CommitTransaction)
- AskBeginTransaction, AnswerBeginTransaction = register(
- 0x0012,
- AskBeginTransaction,
- AnswerBeginTransaction)
- AskFinishTransaction, AnswerTransactionFinished = register(
- 0x0013,
- AskFinishTransaction,
- AnswerTransactionFinished,
- ignore_when_closed=False,
- )
- AskLockInformation, AnswerInformationLocked = register(
- 0x0014,
- AskLockInformation,
- AnswerInformationLocked,
- )
- InvalidateObjects = register(0x0015, InvalidateObjects)
- NotifyUnlockInformation = register(0x0016, NotifyUnlockInformation)
- AskNewOIDs, AnswerNewOIDs = register(
- 0x0017,
- AskNewOIDs,
- AnswerNewOIDs)
- AskStoreObject, AnswerStoreObject = register(
- 0x0018,
- AskStoreObject,
- AnswerStoreObject)
- AbortTransaction = register(0x0019, AbortTransaction)
- AskStoreTransaction, AnswerStoreTransaction = register(
- 0x001A,
- AskStoreTransaction,
- AnswerStoreTransaction)
- AskObject, AnswerObject = register(
- 0x001B,
- AskObject,
- AnswerObject)
- AskTIDs, AnswerTIDs = register(
- 0x001C,
- AskTIDs,
- AnswerTIDs)
- AskTransactionInformation, AnswerTransactionInformation = register(
- 0x001E,
- AskTransactionInformation,
- AnswerTransactionInformation)
- AskObjectHistory, AnswerObjectHistory = register(
- 0x001F,
- AskObjectHistory,
- AnswerObjectHistory)
- AskPartitionList, AnswerPartitionList = register(
- 0x0021,
- AskPartitionList,
- AnswerPartitionList)
- AskNodeList, AnswerNodeList = register(
- 0x0022,
- AskNodeList,
- AnswerNodeList)
- SetNodeState = register(
- 0x0023,
- SetNodeState,
- Error,
- ignore_when_closed=False,
- )
- AddPendingNodes = register(
- 0x0024,
- AddPendingNodes,
- Error,
- ignore_when_closed=False,
- )
- AskNodeInformation, AnswerNodeInformation = register(
- 0x0025,
- AskNodeInformation,
- AnswerNodeInformation)
- SetClusterState = register(
- 0x0026,
- SetClusterState,
- Error,
- ignore_when_closed=False,
- )
- NotifyClusterInformation = register(0x0027, NotifyClusterInformation)
- AskClusterState, AnswerClusterState = register(
- 0x0028,
- AskClusterState,
- AnswerClusterState)
- NotifyLastOID = register(0x0030, NotifyLastOID)
- NotifyReplicationDone = register(0x0031, NotifyReplicationDone)
- AskObjectUndoSerial, AnswerObjectUndoSerial = register(
- 0x0033,
- AskObjectUndoSerial,
- AnswerObjectUndoSerial)
- AskHasLock, AnswerHasLock = register(
- 0x0034,
- AskHasLock,
- AnswerHasLock)
- AskTIDsFrom, AnswerTIDsFrom = register(
- 0x0035,
- AskTIDsFrom,
- AnswerTIDsFrom)
- AskObjectHistoryFrom, AnswerObjectHistoryFrom = register(
- 0x0036,
- AskObjectHistoryFrom,
- AnswerObjectHistoryFrom)
- AskBarrier, AnswerBarrier = register(
- 0x0037,
- AskBarrier,
- AnswerBarrier)
- AskPack, AnswerPack = register(
- 0x0038,
- AskPack,
- AnswerPack,
- ignore_when_closed=False,
- )
- AskCheckTIDRange, AnswerCheckTIDRange = register(
- 0x0039,
- AskCheckTIDRange,
- AnswerCheckTIDRange,
- )
- AskCheckSerialRange, AnswerCheckSerialRange = register(
- 0x003A,
- AskCheckSerialRange,
- AnswerCheckSerialRange,
- )
- NotifyReady = register(0x003B, NotifyReady)
- AskLastTransaction, AnswerLastTransaction = register(
- 0x003C,
- AskLastTransaction,
- AnswerLastTransaction,
- )
- AskCheckCurrentSerial, AnswerCheckCurrentSerial = register(
- 0x003D,
- AskCheckCurrentSerial,
- AnswerCheckCurrentSerial,
- )
-
-# build a "singleton"
-Packets = PacketRegistry()
-
-def register_error(code):
- def wrapper(registry, message=''):
- return Error(code, message)
- return wrapper
-
-class ErrorRegistry(dict):
- """
- Error packet packet registry
- """
-
- def __init__(self):
- dict.__init__(self)
-
- Ack = register_error(ErrorCodes.ACK)
- ProtocolError = register_error(ErrorCodes.PROTOCOL_ERROR)
- TidNotFound = register_error(ErrorCodes.TID_NOT_FOUND)
- OidNotFound = register_error(ErrorCodes.OID_NOT_FOUND)
- OidDoesNotExist = register_error(ErrorCodes.OID_DOES_NOT_EXIST)
- NotReady = register_error(ErrorCodes.NOT_READY)
- Broken = register_error(ErrorCodes.BROKEN_NODE)
- AlreadyPending = register_error(ErrorCodes.ALREADY_PENDING)
-
-Errors = ErrorRegistry()
-
Removed: trunk/neo/pt.py
==============================================================================
--- trunk/neo/pt.py [iso-8859-1] (original)
+++ trunk/neo/pt.py (removed)
@@ -1,387 +0,0 @@
-#
-# Copyright (C) 2006-2010 Nexedi SA
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-
-import neo
-
-from neo import protocol
-from neo.protocol import CellStates
-from neo.util import dump, u64
-from neo.locking import RLock
-
-class PartitionTableException(Exception):
- """
- Base class for partition table exceptions
- """
-
-class Cell(object):
- """This class represents a cell in a partition table."""
-
- def __init__(self, node, state = CellStates.UP_TO_DATE):
- self.node = node
- self.state = state
-
- def __repr__(self):
- return "<Cell(uuid=%s, address=%s, state=%s)>" % (
- dump(self.getUUID()),
- self.getAddress(),
- self.getState(),
- )
-
- def getState(self):
- return self.state
-
- def setState(self, state):
- self.state = state
-
- def isUpToDate(self):
- return self.state == CellStates.UP_TO_DATE
-
- def isOutOfDate(self):
- return self.state == CellStates.OUT_OF_DATE
-
- def isFeeding(self):
- return self.state == CellStates.FEEDING
-
- def getNode(self):
- return self.node
-
- def getNodeState(self):
- """This is a short hand."""
- return self.node.getState()
-
- def getUUID(self):
- return self.node.getUUID()
-
- def getAddress(self):
- return self.node.getAddress()
-
-
-class PartitionTable(object):
- """This class manages a partition table."""
-
- def __init__(self, num_partitions, num_replicas):
- self._id = None
- self.np = num_partitions
- self.nr = num_replicas
- self.num_filled_rows = 0
- # Note: don't use [[]] * num_partition construct, as it duplicates
- # instance *references*, so the outer list contains really just one
- # inner list instance.
- self.partition_list = [[] for _ in xrange(num_partitions)]
- self.count_dict = {}
-
- def getID(self):
- return self._id
-
- def getPartitions(self):
- return self.np
-
- def getReplicas(self):
- return self.nr
-
- def clear(self):
- """Forget an existing partition table."""
- self._id = None
- self.num_filled_rows = 0
- # Note: don't use [[]] * self.np construct, as it duplicates
- # instance *references*, so the outer list contains really just one
- # inner list instance.
- self.partition_list = [[] for _ in xrange(self.np)]
- self.count_dict.clear()
-
- def getAssignedPartitionList(self, uuid):
- """ Return the partition assigned to the specified UUID """
- assigned_partitions = []
- for offset in xrange(self.np):
- for cell in self.getCellList(offset, readable=True):
- if cell.getUUID() == uuid:
- assigned_partitions.append(offset)
- break
- return assigned_partitions
-
- def hasOffset(self, offset):
- try:
- return len(self.partition_list[offset]) > 0
- except IndexError:
- return False
-
- def getNodeList(self):
- """Return all used nodes."""
- return [node for node, count in self.count_dict.iteritems() \
- if count > 0]
-
- def getCellList(self, offset, readable=False, writable=False):
- # allow all cell states
- state_set = set(CellStates.values())
- if readable or writable:
- # except non readables
- state_set.remove(CellStates.DISCARDED)
- if readable:
- # except non writables
- state_set.remove(CellStates.OUT_OF_DATE)
- try:
- return [cell for cell in self.partition_list[offset] \
- if cell is not None and cell.getState() in state_set]
- except (TypeError, KeyError):
- return []
-
- def getCellListForTID(self, tid, readable=False, writable=False):
- return self.getCellList(self.getPartition(tid), readable, writable)
-
- def getCellListForOID(self, oid, readable=False, writable=False):
- return self.getCellList(self.getPartition(oid), readable, writable)
-
- def getPartition(self, oid_or_tid):
- return u64(oid_or_tid) % self.getPartitions()
-
- def getOutdatedOffsetListFor(self, uuid):
- return [
- offset for offset in xrange(self.np)
- for c in self.partition_list[offset]
- if c.getUUID() == uuid and c.getState() == CellStates.OUT_OF_DATE
- ]
-
- def isAssigned(self, oid, uuid):
- """ Check if the oid is assigned to the given node """
- for cell in self.partition_list[u64(oid) % self.np]:
- if cell.getUUID() == uuid:
- return True
- return False
-
- def setCell(self, offset, node, state):
- if state == CellStates.DISCARDED:
- return self.removeCell(offset, node)
- if node.isBroken() or node.isDown():
- raise PartitionTableException('Invalid node state')
-
- self.count_dict.setdefault(node, 0)
- row = self.partition_list[offset]
- if len(row) == 0:
- # Create a new row.
- row = [Cell(node, state), ]
- if state != CellStates.FEEDING:
- self.count_dict[node] += 1
- self.partition_list[offset] = row
-
- self.num_filled_rows += 1
- else:
- # XXX this can be slow, but it is necessary to remove a duplicate,
- # if any.
- for cell in row:
- if cell.getNode() == node:
- row.remove(cell)
- if not cell.isFeeding():
- self.count_dict[node] -= 1
- break
- row.append(Cell(node, state))
- if state != CellStates.FEEDING:
- self.count_dict[node] += 1
- return (offset, node.getUUID(), state)
-
- def removeCell(self, offset, node):
- row = self.partition_list[offset]
- assert row is not None
- for cell in row:
- if cell.getNode() == node:
- row.remove(cell)
- if not cell.isFeeding():
- self.count_dict[node] -= 1
- break
- return (offset, node.getUUID(), CellStates.DISCARDED)
-
- def load(self, ptid, row_list, nm):
- """
- Load the partition table with the specified PTID, discard all previous
- content.
- """
- self.clear()
- self._id = ptid
- for offset, row in row_list:
- if offset >= self.getPartitions():
- raise IndexError
- for uuid, state in row:
- node = nm.getByUUID(uuid)
- # the node must be known by the node manager
- assert node is not None
- self.setCell(offset, node, state)
- neo.logging.debug('partition table loaded')
- self.log()
-
- def update(self, ptid, cell_list, nm):
- """
- Update the partition with the cell list supplied. Ignore those changes
- if the partition table ID is not greater than the current one. If a node
- is not known, it is created in the node manager and set as unavailable
- """
- if ptid <= self._id:
- neo.logging.warning('ignoring older partition changes')
- return
- self._id = ptid
- for offset, uuid, state in cell_list:
- node = nm.getByUUID(uuid)
- assert node is not None, 'No node found for uuid %r' % (dump(uuid), )
- self.setCell(offset, node, state)
- neo.logging.debug('partition table updated')
- self.log()
-
- def filled(self):
- return self.num_filled_rows == self.np
-
- def log(self):
- for line in self._format():
- neo.logging.debug(line)
-
- def format(self):
- return '\n'.join(self._format())
-
- def _format(self):
- """Help debugging partition table management.
-
- Output sample:
- DEBUG:root:pt: node 0: ad7ffe8ceef4468a0c776f3035c7a543, R
- DEBUG:root:pt: node 1: a68a01e8bf93e287bd505201c1405bc2, R
- DEBUG:root:pt: node 2: 67ae354b4ed240a0594d042cf5c01b28, R
- DEBUG:root:pt: node 3: df57d7298678996705cd0092d84580f4, R
- DEBUG:root:pt: 00000000: .UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.
- DEBUG:root:pt: 00000009: U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U
-
- Here, there are 4 nodes in RUNNING state.
- The first partition has 2 replicas in UP_TO_DATE state, on nodes 1 and
- 2 (nodes 0 and 3 are displayed as unused for that partition by
- displaying a dot).
- The 8-digits number on the left represents the number of the first
- partition on the line (here, line length is 9 to keep the docstring
- width under 80 column).
- """
- result = []
- append = result.append
- node_list = self.count_dict.keys()
- node_list = [k for k, v in self.count_dict.items() if v != 0]
- node_list.sort()
- node_dict = {}
- for i, node in enumerate(node_list):
- uuid = node.getUUID()
- node_dict[uuid] = i
- append('pt: node %d: %s, %s' % (i, dump(uuid),
- protocol.node_state_prefix_dict[node.getState()]))
- line = []
- max_line_len = 20 # XXX: hardcoded number of partitions per line
- cell_state_dict = protocol.cell_state_prefix_dict
- for offset, row in enumerate(self.partition_list):
- if len(line) == max_line_len:
- append('pt: %08d: %s' % (offset - max_line_len,
- '|'.join(line)))
- line = []
- if row is None:
- line.append('X' * len(node_list))
- else:
- cell = []
- cell_dict = dict([(node_dict.get(x.getUUID(), None), x)
- for x in row])
- for node in xrange(len(node_list)):
- if node in cell_dict:
- cell.append(cell_state_dict[cell_dict[node].getState()])
- else:
- cell.append('.')
- line.append(''.join(cell))
- if len(line):
- append('pt: %08d: %s' % (offset - len(line) + 1,
- '|'.join(line)))
- return result
-
- def operational(self):
- if not self.filled():
- return False
- for row in self.partition_list:
- for cell in row:
- if (cell.isUpToDate() or cell.isFeeding()) and \
- cell.getNode().isRunning():
- break
- else:
- return False
- return True
-
- def getRow(self, offset):
- row = self.partition_list[offset]
- if row is None:
- return []
- return [(cell.getUUID(), cell.getState()) for cell in row]
-
- def getRowList(self):
- getRow = self.getRow
- return [(x, getRow(x)) for x in xrange(self.np)]
-
- def getNodeMap(self):
- """ Return a list of 2-tuple: (uuid, partition_list) """
- uuid_map = {}
- for index, row in enumerate(self.partition_list):
- for cell in row:
- uuid_map.setdefault(cell.getNode(), []).append(index)
- return uuid_map
-
-def thread_safe(method):
- def wrapper(self, *args, **kwargs):
- self.lock()
- try:
- return method(self, *args, **kwargs)
- finally:
- self.unlock()
- return wrapper
-
-
-class MTPartitionTable(PartitionTable):
- """ Thread-safe aware version of the partition table, override only methods
- used in the client """
-
- def __init__(self, *args, **kwargs):
- self._lock = RLock()
- PartitionTable.__init__(self, *args, **kwargs)
-
- def lock(self):
- self._lock.acquire()
-
- def unlock(self):
- self._lock.release()
-
- @thread_safe
- def getCellListForTID(self, *args, **kwargs):
- return PartitionTable.getCellListForTID(self, *args, **kwargs)
-
- @thread_safe
- def getCellListForOID(self, *args, **kwargs):
- return PartitionTable.getCellListForOID(self, *args, **kwargs)
-
- @thread_safe
- def setCell(self, *args, **kwargs):
- return PartitionTable.setCell(self, *args, **kwargs)
-
- @thread_safe
- def clear(self, *args, **kwargs):
- return PartitionTable.clear(self, *args, **kwargs)
-
- @thread_safe
- def operational(self, *args, **kwargs):
- return PartitionTable.operational(self, *args, **kwargs)
-
- @thread_safe
- def getNodeList(self, *args, **kwargs):
- return PartitionTable.getNodeList(self, *args, **kwargs)
-
- @thread_safe
- def getNodeMap(self, *args, **kwargs):
- return PartitionTable.getNodeMap(self, *args, **kwargs)
-
Modified: trunk/neo/scripts/__init__.py
==============================================================================
--- trunk/neo/scripts/__init__.py [iso-8859-1] (original)
+++ trunk/neo/scripts/__init__.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -0,0 +1,3 @@
+import pkg_resources
+pkg_resources.declare_namespace(__name__)
+
Modified: trunk/neo/scripts/neoadmin.py
==============================================================================
--- trunk/neo/scripts/neoadmin.py [iso-8859-1] (original)
+++ trunk/neo/scripts/neoadmin.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -17,8 +17,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from optparse import OptionParser
-from neo import setupLog
-from neo.config import ConfigurationManager
+from neo.lib import setupLog
+from neo.lib.config import ConfigurationManager
parser = OptionParser()
parser.add_option('-u', '--uuid', help='specify an UUID to use for this ' \
Modified: trunk/neo/scripts/neoctl.py
==============================================================================
--- trunk/neo/scripts/neoctl.py [iso-8859-1] (original)
+++ trunk/neo/scripts/neoctl.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -18,7 +18,7 @@
import sys
from optparse import OptionParser
-from neo import setupLog
+from neo.lib import setupLog
parser = OptionParser()
parser.add_option('-v', '--verbose', action = 'store_true',
Modified: trunk/neo/scripts/neomaster.py
==============================================================================
--- trunk/neo/scripts/neomaster.py [iso-8859-1] (original)
+++ trunk/neo/scripts/neomaster.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -17,8 +17,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from optparse import OptionParser
-from neo import setupLog
-from neo.config import ConfigurationManager
+from neo.lib import setupLog
+from neo.lib.config import ConfigurationManager
parser = OptionParser()
parser.add_option('-v', '--verbose', action = 'store_true',
Modified: trunk/neo/scripts/neomigrate.py
==============================================================================
--- trunk/neo/scripts/neomigrate.py [iso-8859-1] (original)
+++ trunk/neo/scripts/neomigrate.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -23,7 +23,7 @@ import logging
import time
import os
-from neo import setupLog
+from neo.lib import setupLog
# register options
parser = OptionParser()
Modified: trunk/neo/scripts/neostorage.py
==============================================================================
--- trunk/neo/scripts/neostorage.py [iso-8859-1] (original)
+++ trunk/neo/scripts/neostorage.py [iso-8859-1] Mon Jan 17 16:25:37 2011
@@ -19,8 +19,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from optparse import OptionParser
-from neo import setupLog
-from neo.config import ConfigurationManager
+from neo.lib import setupLog
+from neo.lib.config import ConfigurationManager
parser = OptionParser()
More information about the Neo-report
mailing list