[Neo-report] r2143 vincent - in /trunk/neo: ./ client/ tests/
nobody at svn.erp5.org
nobody at svn.erp5.org
Mon Jun 7 15:47:57 CEST 2010
Author: vincent
Date: Mon Jun 7 15:47:54 2010
New Revision: 2143
Log:
Add back "queue" parameter on MTClientConnection.ask
Make it optional, to suit "ping" use, but check that it's always passed
except in that special case.
Modified:
trunk/neo/client/app.py
trunk/neo/client/pool.py
trunk/neo/connection.py
trunk/neo/tests/testConnection.py
Modified: trunk/neo/client/app.py
==============================================================================
--- trunk/neo/client/app.py [iso-8859-1] (original)
+++ trunk/neo/client/app.py [iso-8859-1] Mon Jun 7 15:47:54 2010
@@ -260,14 +260,14 @@
@profiler_decorator
def _askStorage(self, conn, packet):
""" Send a request to a storage node and process it's answer """
- msg_id = conn.ask(packet)
+ msg_id = conn.ask(packet, queue=self.local_var.queue)
self._waitMessage(conn, msg_id, self.storage_handler)
@profiler_decorator
def _askPrimary(self, packet):
""" Send a request to the primary master and process it's answer """
conn = self._getMasterConnection()
- msg_id = conn.ask(packet)
+ msg_id = conn.ask(packet, queue=self.local_var.queue)
self._waitMessage(conn, msg_id, self.primary_handler)
@profiler_decorator
@@ -308,6 +308,7 @@
logging.debug('connecting to primary master...')
ready = False
nm = self.nm
+ queue = self.local_var.queue
while not ready:
# Get network connection to primary master
index = 0
@@ -328,7 +329,7 @@
self.trying_master_node = master_list[0]
index += 1
# Connect to master
- conn = MTClientConnection(self.local_var, self.em,
+ conn = MTClientConnection(self.em,
self.notifications_handler,
addr=self.trying_master_node.getAddress(),
connector=self.connector_handler(),
@@ -339,7 +340,7 @@
logging.error('Connection to master node %s failed',
self.trying_master_node)
continue
- msg_id = conn.ask(Packets.AskPrimary())
+ msg_id = conn.ask(Packets.AskPrimary(), queue=queue)
try:
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
@@ -359,7 +360,7 @@
break
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name)
- msg_id = conn.ask(p)
+ msg_id = conn.ask(p, queue=queue)
try:
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
@@ -370,10 +371,10 @@
# Node identification was refused by master.
time.sleep(1)
if self.uuid is not None:
- msg_id = conn.ask(Packets.AskNodeInformation())
+ msg_id = conn.ask(Packets.AskNodeInformation(), queue=queue)
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
- msg_id = conn.ask(Packets.AskPartitionTable())
+ msg_id = conn.ask(Packets.AskPartitionTable(), queue=queue)
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
ready = self.uuid is not None and self.pt is not None \
@@ -597,12 +598,13 @@
self.local_var.object_stored_counter_dict[oid] = {}
self.local_var.object_serial_dict[oid] = (serial, version)
getConnForCell = self.cp.getConnForCell
+ queue = self.local_var.queue
for cell in cell_list:
conn = getConnForCell(cell)
if conn is None:
continue
try:
- conn.ask(p, on_timeout=on_timeout)
+ conn.ask(p, on_timeout=on_timeout, queue=queue)
except ConnectionClosed:
continue
@@ -870,9 +872,10 @@
undo_error_oid_list = self.local_var.undo_error_oid_list = []
ask_undo_transaction = Packets.AskUndoTransaction(tid, undone_tid)
getConnForNode = self.cp.getConnForNode
+ queue = self.local_var.queue
for storage_node in self.nm.getStorageList():
storage_conn = getConnForNode(storage_node)
- storage_conn.ask(ask_undo_transaction)
+ storage_conn.ask(ask_undo_transaction, queue=queue)
# Wait for all AnswerUndoTransaction.
self.waitResponses()
@@ -927,11 +930,12 @@
storage_node_list = pt.getNodeList()
self.local_var.node_tids = {}
+ queue = self.local_var.queue
for storage_node in storage_node_list:
conn = self.cp.getConnForNode(storage_node)
if conn is None:
continue
- conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION))
+ conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION), queue=queue)
# Wait for answers from all storages.
while len(self.local_var.node_tids) != len(storage_node_list):
Modified: trunk/neo/client/pool.py
==============================================================================
--- trunk/neo/client/pool.py [iso-8859-1] (original)
+++ trunk/neo/client/pool.py [iso-8859-1] Mon Jun 7 15:47:54 2010
@@ -50,7 +50,7 @@
while True:
logging.debug('trying to connect to %s - %s', node, node.getState())
app.setNodeReady()
- conn = MTClientConnection(app.local_var, app.em,
+ conn = MTClientConnection(app.em,
app.storage_event_handler, addr,
connector=app.connector_handler(), dispatcher=app.dispatcher)
conn.lock()
@@ -63,7 +63,7 @@
p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name)
- msg_id = conn.ask(p)
+ msg_id = conn.ask(p, queue=app.local_var.queue)
finally:
conn.unlock()
Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Mon Jun 7 15:47:54 2010
@@ -684,10 +684,9 @@
class MTClientConnection(ClientConnection):
"""A Multithread-safe version of ClientConnection."""
- def __init__(self, local_var, *args, **kwargs):
+ def __init__(self, *args, **kwargs):
# _lock is only here for lock debugging purposes. Do not use.
self._lock = lock = RLock()
- self._local_var = local_var
self.acquire = lock.acquire
self.release = lock.release
self.dispatcher = kwargs.pop('dispatcher')
@@ -723,7 +722,8 @@
self.unlock()
@profiler_decorator
- def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None):
+ def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None,
+ queue=None):
self.lock()
try:
# XXX: Here, we duplicate Connection.ask because we need to call
@@ -731,7 +731,12 @@
# _addPacket is called.
msg_id = self._getNextId()
packet.setId(msg_id)
- self.dispatcher.register(self, msg_id, self._local_var.queue)
+ if queue is None:
+ if not isinstance(packet, Packets.Ping):
+ raise TypeError, 'Only Ping packet can be asked ' \
+ 'without a queue, got a %r.' % (packet, )
+ else:
+ self.dispatcher.register(self, msg_id, queue)
self._addPacket(packet)
t = time()
# If there is no pending request, initialise timeout values.
Modified: trunk/neo/tests/testConnection.py
==============================================================================
--- trunk/neo/tests/testConnection.py [iso-8859-1] (original)
+++ trunk/neo/tests/testConnection.py [iso-8859-1] Mon Jun 7 15:47:54 2010
@@ -27,6 +27,7 @@
from neo.protocol import Packets, ParserState
from neo.tests import NeoTestBase
from neo.util import ReadBuffer
+from neo.locking import Queue
class ConnectionTests(NeoTestBase):
@@ -808,6 +809,30 @@
self.assertEqual(bc.aborted, True)
self.assertTrue(bc.isServer())
+class MTConnectionTests(ConnectionTests):
+ # XXX: here we test non-client-connection-related things too, which
+ # duplicates test suite work... Should be fragmented into finer-grained
+ # test classes.
+
+ def setUp(self):
+ super(MTConnectionTests, self).setUp()
+ self.dispatcher = Mock({'__repr__': 'Fake Dispatcher'})
+
+ def _makeClientConnection(self):
+ self.connector = DoNothingConnector()
+ return MTClientConnection(event_manager=self.em, handler=self.handler,
+ connector=self.connector, addr=self.address,
+ dispatcher=self.dispatcher)
+
+ def test_MTClientConnectionQueueParameter(self):
+ queue = Queue()
+ ask = self._makeClientConnection().ask
+ packet = Packets.AskPrimary() # Any non-Ping simple "ask" packet
+ # One cannot "ask" anything without a queue
+ self.assertRaises(TypeError, ask, packet)
+ ask(packet, queue=queue)
+ # ... except Ping
+ ask(Packets.Ping())
class HandlerSwitcherTests(NeoTestBase):
More information about the Neo-report
mailing list