[Neo-report] r1895 gregory - in /trunk/neo: ./ tests/ tests/master/
nobody at svn.erp5.org
nobody at svn.erp5.org
Fri Mar 5 22:23:06 CET 2010
Author: gregory
Date: Fri Mar 5 22:23:06 2010
New Revision: 1895
Log:
Per-connection timeout support (instead of per-packet).
- Rename IdleEvent to IdleTimeout from event.py to connection.py
- Move connection-related logic in Connection itself and keep only
time-related logic in IdleTimeout
- Clarify differences between hard and soft timeouts.
- Remove (unused) 'additional_timeout' from ask()
- Remove (now useless) event_dict attribute from Connection.
- Remove external ping support, as the answer can not be handled at
application level.
- Expectation after a new incoming connection moved from Handler to
Connection.
- Fix (and clean) related tests.
Modified:
trunk/neo/connection.py
trunk/neo/event.py
trunk/neo/handler.py
trunk/neo/tests/master/testElectionHandler.py
trunk/neo/tests/testConnection.py
trunk/neo/tests/testEvent.py
Modified: trunk/neo/connection.py
==============================================================================
--- trunk/neo/connection.py [iso-8859-1] (original)
+++ trunk/neo/connection.py [iso-8859-1] Fri Mar 5 22:23:06 2010
@@ -15,11 +15,12 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+from time import time
+
from neo import logging
from neo.locking import RLock
from neo.protocol import PacketMalformedError, Packets
-from neo.event import IdleEvent
from neo.connector import ConnectorException, ConnectorTryAgainException, \
ConnectorInProgressException, ConnectorConnectionRefusedException, \
ConnectorConnectionClosedException
@@ -27,6 +28,11 @@
from neo.logger import PACKET_LOGGER
from neo import attributeTracker
+
+PING_DELAY = 5
+PING_TIMEOUT = 5
+INCOMING_TIMEOUT = 10
+CRITICAL_TIMEOUT = 30
APPLY_HANDLER = object()
@@ -120,6 +126,34 @@
self._pending.append([{}, handler])
+class Timeout(object):
+ """ Keep track of current timeouts """
+
+ def __init__(self):
+ self._ping_time = None
+ self._critical_time = None
+
+ def update(self, t, timeout=CRITICAL_TIMEOUT):
+ """ Update the new critical time """
+ self._ping_time = t + PING_TIMEOUT
+ critical_time = self._ping_time + timeout
+ self._critical_time = max(critical_time, self._critical_time)
+
+ def refresh(self, t):
+ """ Refresh timeout after something received """
+ self._ping_time = t + PING_DELAY
+
+ def softExpired(self, t):
+ """ Indicate if the soft timeout (ping delay) is reached """
+ # hard timeout takes precedences
+ return self._ping_time < t < self._critical_time
+
+ def hardExpired(self, t):
+ """ Indicate if hard (or pong) timeout is reached """
+ # should be called if softExpired if False
+ return self._critical_time < t or self._ping_time < t
+
+
class BaseConnection(object):
"""A base connection."""
@@ -129,7 +163,19 @@
self.connector = connector
self.addr = addr
self._handlers = HandlerSwitcher(self, handler)
+ self._timeout = Timeout()
event_manager.register(self)
+
+ def checkTimeout(self, t):
+ if self._handlers.isPending():
+ if self._timeout.softExpired(t):
+ self._timeout.refresh(t)
+ self.ping()
+ elif self._timeout.hardExpired(t):
+ # critical time reach or pong not received, abort
+ logging.info('timeout with %s:%d', *(self.getAddress()))
+ self.close()
+ self.getHandler().timeoutExpired(self)
def lock(self):
return 1
@@ -215,6 +261,8 @@
handler = self.getHandler()
new_conn = ServerConnection(self.getEventManager(), handler,
connector=new_s, addr=addr)
+ # A request for a node identification should arrive.
+ self._timeout.update(time(), timeout=INCOMING_TIMEOUT)
handler.connectionAccepted(new_conn)
except ConnectorTryAgainException:
pass
@@ -236,7 +284,6 @@
self.write_buf = []
self.cur_id = 0
self.peer_id = 0
- self.event_dict = {}
self.aborted = False
self.uuid = None
self._queue = []
@@ -271,12 +318,9 @@
logging.debug('closing a connector for %s (%s:%d)',
dump(self.uuid), *(self.addr))
BaseConnection.close(self)
- for event in self.event_dict.itervalues():
- self.em.removeIdleEvent(event)
if self._on_close is not None:
self._on_close()
self._on_close = None
- self.event_dict.clear()
del self.write_buf[:]
del self.read_buf[:]
self._handlers.clear()
@@ -320,24 +364,14 @@
except PacketMalformedError, msg:
self.getHandler()._packetMalformed(self, msg)
return
+ self._timeout.refresh(time())
msg = msg[len(packet):]
-
packet_type = packet.getType()
- # Remove idle events, if appropriate packets were received.
- for msg_id in (None, packet.getId()):
- event = self.event_dict.pop(msg_id, None)
- if event is not None:
- if packet_type == Packets.Pong:
- self.em.refreshIdleEvent(event)
- self.event_dict[msg_id] = event
- else:
- self.em.removeIdleEvent(event)
-
if packet_type == Packets.Ping:
# Send a pong notification
self.answer(Packets.Pong(), packet.getId())
elif packet_type != Packets.Pong:
- # Skip PONG packets, its only purpose is to drop IdleEvent
+ # Skip PONG packets, its only purpose is refresh the timeout
# generated upong ping.
self._queue.append(packet)
self.read_buf = [msg]
@@ -434,33 +468,6 @@
# enable polling for writing.
self.em.addWriter(self)
- def expectMessage(self, msg_id=None, timeout=5, additional_timeout=30):
- """Expect a message for a reply to a given message ID or any message.
-
- The purpose of this method is to define how much amount of time is
- acceptable to wait for a message, thus to detect a down or broken
- peer. This is important, because one error may halt a whole cluster
- otherwise. Although TCP defines a keep-alive feature, the timeout
- is too long generally, and it does not detect a certain type of reply,
- thus it is better to probe problems at the application level.
-
- The message ID specifies what ID is expected. Usually, this should
- be identical with an ID for a request message. If it is None, any
- message is acceptable, so it can be used to check idle time.
-
- The timeout is the amount of time to wait until keep-alive messages start.
- Once the timeout is expired, the connection starts to ping the peer.
-
- The additional timeout defines the amount of time after the timeout
- to invoke a timeoutExpired callback. If it is zero, no ping is sent, and
- the callback is executed immediately."""
- if self.connector is None:
- return
-
- event = IdleEvent(self, msg_id, timeout, additional_timeout)
- self.event_dict[msg_id] = event
- self.em.addIdleEvent(event)
-
@not_closed
def notify(self, packet):
""" Then a packet with a new ID """
@@ -470,15 +477,15 @@
return msg_id
@not_closed
- def ask(self, packet, timeout=5, additional_timeout=30):
+ def ask(self, packet, timeout=CRITICAL_TIMEOUT):
"""
Send a packet with a new ID and register the expectation of an answer
"""
msg_id = self._getNextId()
packet.setId(msg_id)
- self.expectMessage(msg_id, timeout=timeout,
- additional_timeout=additional_timeout)
self._addPacket(packet)
+ if not self._handlers.isPending():
+ self._timeout.update(time(), timeout=timeout)
self._handlers.emit(packet)
return msg_id
@@ -491,13 +498,10 @@
assert packet.isResponse(), packet
self._addPacket(packet)
- def ping(self, timeout=5, msg_id=None):
- """ Send a ping and expect to receive a pong notification """
+ @not_closed
+ def ping(self):
packet = Packets.Ping()
- if msg_id is None:
- msg_id = self._getNextId()
- self.expectMessage(msg_id, timeout, 0)
- packet.setId(msg_id)
+ packet.setId(self._getNextId())
self._addPacket(packet)
@@ -583,26 +587,27 @@
return super(MTClientConnection, self).analyse(*args, **kw)
@lockCheckWrapper
- def expectMessage(self, *args, **kw):
- return super(MTClientConnection, self).expectMessage(*args, **kw)
-
- @lockCheckWrapper
def notify(self, *args, **kw):
return super(MTClientConnection, self).notify(*args, **kw)
@lockCheckWrapper
- def ask(self, queue, packet, timeout=5, additional_timeout=30):
+ def ask(self, queue, packet, timeout=CRITICAL_TIMEOUT):
msg_id = self._getNextId()
packet.setId(msg_id)
self.dispatcher.register(self, msg_id, queue)
- self.expectMessage(msg_id)
self._addPacket(packet)
+ if not self._handlers.isPending():
+ self._timeout.update(time(), timeout=timeout)
self._handlers.emit(packet)
return msg_id
@lockCheckWrapper
def answer(self, *args, **kw):
return super(MTClientConnection, self).answer(*args, **kw)
+
+ @lockCheckWrapper
+ def checkTimeout(self, *args, **kw):
+ return super(MTClientConnection, self).checkTimeout(*args, **kw)
def close(self):
self.lock()
@@ -645,10 +650,6 @@
return super(MTServerConnection, self).analyse(*args, **kw)
@lockCheckWrapper
- def expectMessage(self, *args, **kw):
- return super(MTServerConnection, self).expectMessage(*args, **kw)
-
- @lockCheckWrapper
def notify(self, *args, **kw):
return super(MTServerConnection, self).notify(*args, **kw)
@@ -660,3 +661,7 @@
def answer(self, *args, **kw):
return super(MTServerConnection, self).answer(*args, **kw)
+ @lockCheckWrapper
+ def checkTimeout(self, *args, **kw):
+ return super(MTServerConnection, self).checkTimeout(*args, **kw)
+
Modified: trunk/neo/event.py
==============================================================================
--- trunk/neo/event.py [iso-8859-1] (original)
+++ trunk/neo/event.py [iso-8859-1] Fri Mar 5 22:23:06 2010
@@ -15,73 +15,9 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-from neo import logging
from time import time
from neo.epoll import Epoll
-
-PING_DELAY = 5
-PING_TIMEOUT = 5
-
-class IdleEvent(object):
- """
- This class represents an event called when a connection is waiting for
- a message too long.
- """
-
- def __init__(self, conn, msg_id, timeout, additional_timeout):
- self._conn = conn
- self._id = msg_id
- t = time()
- self._time = t + timeout
- self._critical_time = t + timeout + additional_timeout
- self.refresh()
-
- def getId(self):
- return self._id
-
- def getTime(self):
- return self._time
-
- def getCriticalTime(self):
- return self._critical_time
-
- def refresh(self):
- self._next_critical_time = self._critical_time
-
- def __call__(self, t):
- conn = self._conn
- if t > self._next_critical_time:
- # No answer after _critical_time, close connection.
- # This means that remote peer is processing the request for too
- # long, although being responsive at network level.
- logging.info('timeout for %r with %s:%d',
- self._id, *(conn.getAddress()))
- conn.lock()
- try:
- conn.close()
- conn.getHandler().timeoutExpired(conn)
- finally:
- conn.unlock()
- return True
- elif t > self._time:
- # Still no answer after _time, send a ping to see if connection is
- # broken.
- # XXX: This code has no meaning if the remote peer is single-
- # threaded. Nevertheless, it should be kept in case it gets
- # multithreaded, someday (master & storage are the only candidates
- # for using this code, as other don't receive requests).
- conn.lock()
- try:
- conn.ping(msg_id=self._id)
- finally:
- conn.unlock()
- # Don't retry pinging after at least PING_DELAY seconds have
- # passed.
- self._time = t + PING_DELAY
- self._next_critical_time = min(self._critical_time,
- t + PING_TIMEOUT)
- return False
class EpollEventManager(object):
"""This class manages connections and events based on epoll(5)."""
@@ -90,7 +26,6 @@
self.connection_dict = {}
self.reader_set = set([])
self.writer_set = set([])
- self.event_list = []
self.prev_time = time()
self.epoll = Epoll()
self._pending_processing = []
@@ -164,6 +99,7 @@
self._addPendingConnection(to_process)
def _poll(self, timeout = 1):
+ assert timeout >= 0
rlist, wlist = self.epoll.poll(timeout)
r_done_set = set()
for fd in rlist:
@@ -196,32 +132,13 @@
finally:
conn.unlock()
- # Check idle events. Do not check them out too often, because this
- # is somehow heavy.
- event_list = self.event_list
- if event_list:
- t = time()
- if t - self.prev_time >= 1:
- self.prev_time = t
- event_list.sort(key = lambda event: event.getTime())
- while event_list:
- event = event_list[0]
- if event(t):
- self.removeIdleEvent(event)
- else:
- break
-
- def addIdleEvent(self, event):
- self.event_list.append(event)
-
- def removeIdleEvent(self, event):
- try:
- self.event_list.remove(event)
- except ValueError:
- pass
-
- def refreshIdleEvent(self, event):
- event.refresh()
+ t = time()
+ for conn in self.connection_dict.values():
+ conn.lock()
+ try:
+ conn.checkTimeout(t)
+ finally:
+ conn.unlock()
def addReader(self, conn):
connector = conn.getConnector()
Modified: trunk/neo/handler.py
==============================================================================
--- trunk/neo/handler.py [iso-8859-1] (original)
+++ trunk/neo/handler.py [iso-8859-1] Fri Mar 5 22:23:06 2010
@@ -109,8 +109,6 @@
def connectionAccepted(self, conn):
"""Called when a connection is accepted."""
- # A request for a node identification should arrive.
- conn.expectMessage(timeout = 10, additional_timeout = 0)
def timeoutExpired(self, conn):
"""Called when a timeout event occurs."""
Modified: trunk/neo/tests/master/testElectionHandler.py
==============================================================================
--- trunk/neo/tests/master/testElectionHandler.py [iso-8859-1] (original)
+++ trunk/neo/tests/master/testElectionHandler.py [iso-8859-1] Fri Mar 5 22:23:06 2010
@@ -30,10 +30,6 @@
def _addPacket(self, packet):
if self.connector is not None:
self.connector._addPacket(packet)
-def expectMessage(self, packet, timeout=5, additional_timeout=30):
- if self.connector is not None:
- self.connector.expectMessage(packet)
-
class MasterClientElectionTests(NeoTestBase):
@@ -56,14 +52,11 @@
self.master_port = 10011
# apply monkey patches
self._addPacket = ClientConnection._addPacket
- self.expectMessage = ClientConnection.expectMessage
ClientConnection._addPacket = _addPacket
- ClientConnection.expectMessage = expectMessage
def tearDown(self):
# restore patched methods
ClientConnection._addPacket = self._addPacket
- ClientConnection.expectMessage = self.expectMessage
NeoTestBase.tearDown(self)
def identifyToMasterNode(self):
@@ -220,15 +213,12 @@
self.master_address = ('127.0.0.1', 3000)
# apply monkey patches
self._addPacket = ClientConnection._addPacket
- self.expectMessage = ClientConnection.expectMessage
ClientConnection._addPacket = _addPacket
- ClientConnection.expectMessage = expectMessage
def tearDown(self):
NeoTestBase.tearDown(self)
# restore environnement
ClientConnection._addPacket = self._addPacket
- ClientConnection.expectMessage = self.expectMessage
def identifyToMasterNode(self, uuid=True):
node = self.app.nm.getMasterList()[0]
Modified: trunk/neo/tests/testConnection.py
==============================================================================
--- trunk/neo/tests/testConnection.py [iso-8859-1] (original)
+++ trunk/neo/tests/testConnection.py [iso-8859-1] Fri Mar 5 22:23:06 2010
@@ -15,10 +15,11 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import unittest
+from time import time
from mock import Mock
from neo.connection import ListeningConnection, Connection, \
ClientConnection, ServerConnection, MTClientConnection, \
- MTServerConnection, HandlerSwitcher
+ MTServerConnection, HandlerSwitcher, Timeout
from neo.connector import getConnectorHandler, registerConnectorHandler
from neo.tests import DoNothingConnector
from neo.connector import ConnectorException, ConnectorTryAgainException, \
@@ -121,12 +122,6 @@
calls = self.connector.mockGetNamedCalls("makeClientConnection")
self.assertEqual(len(calls), n)
self.assertEqual(calls[n-1].getParam(0), self.address)
-
- def _checkAddIdleEvent(self, n=1):
- self.assertEquals(len(self.em.mockGetNamedCalls("addIdleEvent")), n)
-
- def _checkRemoveIdleEvent(self, n=1):
- self.assertEquals(len(self.em.mockGetNamedCalls("removeIdleEvent")), n)
def _checkPacketReceived(self, n=1):
calls = self.handler.mockGetNamedCalls('packetReceived')
@@ -192,7 +187,6 @@
self._checkReadBuf(bc, '')
self._checkWriteBuf(bc, '')
self.assertEqual(bc.cur_id, 0)
- self.assertEqual(bc.event_dict, {})
self.assertEqual(bc.aborted, False)
# test uuid
self.assertEqual(bc.uuid, None)
@@ -377,25 +371,14 @@
self._checkWriteBuf(bc, 'testdata')
self._checkWriterAdded(1)
- def test_08_Connection_expectMessage(self):
- # with a right connector -> event created
- bc = self._makeConnection()
- self.assertEqual(len(bc.event_dict), 0)
- bc.expectMessage('1')
- self.assertEqual(len(bc.event_dict), 1)
- self._checkAddIdleEvent(1)
-
def test_Connection_analyse1(self):
# nothing to read, nothing is done
bc = self._makeConnection()
bc._queue = Mock()
self._checkReadBuf(bc, '')
- self.assertEqual(len(bc.event_dict), 0)
bc.analyse()
- self._checkRemoveIdleEvent(0)
self._checkPacketReceived(0)
self._checkReadBuf(bc, '')
- self.assertEqual(len(bc.event_dict), 0)
# give some data to analyse
master_list = (
@@ -410,17 +393,14 @@
p = Packets.AnswerPrimary(self.getNewUUID(), master_list)
p.setId(1)
bc.read_buf += p.encode()
- self.assertEqual(len(bc.event_dict), 0)
bc.analyse()
# check packet decoded
- self._checkRemoveIdleEvent(0)
self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 1)
call = bc._queue.mockGetNamedCalls("append")[0]
data = call.getParam(0)
self.assertEqual(data.getType(), p.getType())
self.assertEqual(data.getId(), p.getId())
self.assertEqual(data.decode(), p.decode())
- self.assertEqual(len(bc.event_dict), 0)
self._checkReadBuf(bc, '')
def test_Connection_analyse2(self):
@@ -454,10 +434,8 @@
p2.setId(2)
bc.read_buf += p2.encode()
self.assertEqual(len(''.join(bc.read_buf)), len(p1) + len(p2))
- self.assertEqual(len(bc.event_dict), 0)
bc.analyse()
# check two packets decoded
- self._checkRemoveIdleEvent(0)
self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 2)
# packet 1
call = bc._queue.mockGetNamedCalls("append")[0]
@@ -471,7 +449,6 @@
self.assertEqual(data.getType(), p2.getType())
self.assertEqual(data.getId(), p2.getId())
self.assertEqual(data.decode(), p2.decode())
- self.assertEqual(len(bc.event_dict), 0)
self._checkReadBuf(bc, '')
def test_Connection_analyse3(self):
@@ -480,11 +457,9 @@
bc._queue = Mock()
bc.read_buf += "datadatadatadata"
self.assertEqual(len(bc.read_buf), 16)
- self.assertEqual(len(bc.event_dict), 0)
bc.analyse()
self.assertEqual(len(bc.read_buf), 16)
self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 0)
- self._checkRemoveIdleEvent(0)
def test_Connection_analyse4(self):
# give an expected packet
@@ -502,19 +477,14 @@
p = Packets.AnswerPrimary(self.getNewUUID(), master_list)
p.setId(1)
bc.read_buf += p.encode()
- self.assertEqual(len(bc.event_dict), 0)
- bc.expectMessage(1)
- self.assertEqual(len(bc.event_dict), 1)
bc.analyse()
# check packet decoded
- self._checkRemoveIdleEvent(1)
self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 1)
call = bc._queue.mockGetNamedCalls("append")[0]
data = call.getParam(0)
self.assertEqual(data.getType(), p.getType())
self.assertEqual(data.getId(), p.getId())
self.assertEqual(data.decode(), p.decode())
- self.assertEqual(len(bc.event_dict), 0)
self.assertEqual(''.join(bc.read_buf), '')
def test_Connection_writable1(self):
@@ -614,13 +584,11 @@
bc.readable()
# check packet decoded
self._checkReadBuf(bc, '')
- self._checkRemoveIdleEvent(0)
self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 1)
call = bc._queue.mockGetNamedCalls("append")[0]
data = call.getParam(0)
self.assertEqual(data.getType(), Packets.AnswerPrimary)
self.assertEqual(data.getId(), 1)
- self.assertEqual(len(bc.event_dict), 0)
self._checkReadBuf(bc, '')
# check not aborted
self.assertFalse(bc.aborted)
@@ -763,7 +731,6 @@
self._checkReadBuf(bc, '')
self._checkWriteBuf(bc, '')
self.assertEqual(bc.cur_id, 0)
- self.assertEqual(bc.event_dict, {})
self.assertEqual(bc.aborted, False)
# test uuid
self.assertEqual(bc.uuid, None)
@@ -941,5 +908,51 @@
self.checkAborted(self._connection)
+class TestTimeout(NeoTestBase):
+ """ assume PING_DELAY=5 """
+
+ def setUp(self):
+ self.initial = time()
+ self.current = self.initial
+ self.timeout = Timeout()
+
+ def checkAfter(self, n, soft, hard):
+ at = self.current + n
+ self.assertEqual(soft, self.timeout.softExpired(at))
+ self.assertEqual(hard, self.timeout.hardExpired(at))
+
+ def refreshAfter(self, n):
+ self.current += n
+ self.timeout.refresh(self.current)
+
+ def testNoTimeout(self):
+ self.timeout.update(self.initial, 5)
+ self.checkAfter(1, False, False)
+ self.checkAfter(4, False, False)
+ self.refreshAfter(4) # answer received
+ self.checkAfter(1, False, False)
+
+ def testSoftTimeout(self):
+ self.timeout.update(self.initial, 5)
+ self.checkAfter(1, False, False)
+ self.checkAfter(4, False, False)
+ self.checkAfter(6, True, True) # ping
+ self.refreshAfter(8) # pong
+ self.checkAfter(1, False, False)
+ self.checkAfter(4, False, True)
+
+ def testHardTimeout(self):
+ self.timeout.update(self.initial, 5)
+ self.checkAfter(1, False, False)
+ self.checkAfter(4, False, False)
+ self.checkAfter(6, True, True) # ping
+ self.refreshAfter(6) # pong
+ self.checkAfter(1, False, False)
+ self.checkAfter(4, False, False)
+ self.checkAfter(6, False, True) # ping
+ self.refreshAfter(6) # pong
+ self.checkAfter(1, False, True) # too late
+ self.checkAfter(5, False, True)
+
if __name__ == '__main__':
unittest.main()
Modified: trunk/neo/tests/testEvent.py
==============================================================================
--- trunk/neo/tests/testEvent.py [iso-8859-1] (original)
+++ trunk/neo/tests/testEvent.py [iso-8859-1] Fri Mar 5 22:23:06 2010
@@ -19,7 +19,7 @@
from time import time
from neo.tests import NeoTestBase
from neo.epoll import Epoll
-from neo.event import EpollEventManager, IdleEvent
+from neo.event import EpollEventManager
class EventTests(NeoTestBase):
@@ -35,7 +35,6 @@
self.assertEqual(len(em.connection_dict), 0)
self.assertEqual(len(em.reader_set), 0)
self.assertEqual(len(em.writer_set), 0)
- self.assertEqual(len(em.event_list), 0)
self.assertTrue(em.prev_time <time)
self.assertTrue(isinstance(em.epoll, Epoll))
# use a mock object instead of epoll
@@ -62,16 +61,6 @@
data = call.getParam(0)
self.assertEqual(data, 1014)
self.assertEqual(len(em.getConnectionList()), 0)
-
- # add/removeIdleEvent
- event = Mock()
- self.assertEqual(len(em.event_list), 0)
- em.addIdleEvent(event)
- self.assertEqual(len(em.event_list), 1)
- em.removeIdleEvent(event)
- self.assertEqual(len(em.event_list), 0)
- em.removeIdleEvent(event) # must not fail
- self.assertEqual(len(em.event_list), 0)
# add/removeReader
connector = Mock({"getDescriptor" : 1515})
@@ -136,102 +125,6 @@
#self.assertEquals(len(w_conn.mockGetNamedCalls("readable")), 0)
#self.assertEquals(len(w_conn.mockGetNamedCalls("writable")), 1)
- def test_02_IdleEvent(self):
- # test init
- handler = Mock()
- conn = Mock({"getAddress" : ("127.9.9.9", 135),
- "getHandler" : handler})
- event = IdleEvent(conn, 1, 10, 20)
- self.assertEqual(event.getId(), 1)
- self.assertNotEqual(event.getTime(), None)
- time = event.getTime()
- self.assertNotEqual(event.getCriticalTime(), None)
- critical_time = event.getCriticalTime()
- self.assertEqual(critical_time, time+20)
-
- # call with t < time < critical_time
- t = time - 10
- r = event(t)
- self.assertFalse(r)
- self.assertEquals(len(conn.mockGetNamedCalls("lock")), 0)
- self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 0)
- self.assertEquals(len(conn.mockGetNamedCalls("close")), 0)
- self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 0)
- self.checkNoPacketSent(conn)
- self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 0)
-
- # call with time < t < critical_time
- t = time + 5
- self.assertTrue(t < critical_time)
- r = event(t)
- self.assertFalse(r)
- self.assertEquals(len(conn.mockGetNamedCalls("lock")), 1)
- self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 0)
- self.assertEquals(len(conn.mockGetNamedCalls("close")), 0)
- self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 1)
- self.assertEquals(len(conn.mockGetNamedCalls("ping")), 1)
- self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 0)
-
- # call with time < critical_time < t
- t = critical_time + 5
- self.assertTrue(t > critical_time)
- r = event(t)
- self.assertTrue(r)
- self.assertEquals(len(conn.mockGetNamedCalls("lock")), 2)
- self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 1)
- self.assertEquals(len(conn.mockGetNamedCalls("close")), 1)
- self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 2)
- self.assertEquals(len(conn.mockGetNamedCalls("ping")), 1)
- self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 1)
-
- # same test with additional time < 5
- # test init
- handler = Mock()
- conn = Mock({"getAddress" : ("127.9.9.9", 135),
- "getHandler" : handler})
- event = IdleEvent(conn, 1, 10, 3)
- self.assertEqual(event.getId(), 1)
- self.assertNotEqual(event.getTime(), None)
- time = event.getTime()
- self.assertNotEqual(event.getCriticalTime(), None)
- critical_time = event.getCriticalTime()
- self.assertEqual(critical_time, time+3)
-
- # call with t < time < critical_time
- t = time - 10
- r = event(t)
- self.assertFalse(r)
- self.assertEquals(len(conn.mockGetNamedCalls("lock")), 0)
- self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 0)
- self.assertEquals(len(conn.mockGetNamedCalls("close")), 0)
- self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 0)
- self.checkNoPacketSent(conn)
- self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 0)
-
- # call with time < t < critical_time
- t = time + 1
- self.assertTrue(t < critical_time)
- r = event(t)
- self.assertFalse(r)
- self.assertEquals(len(conn.mockGetNamedCalls("lock")), 1)
- self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 0)
- self.assertEquals(len(conn.mockGetNamedCalls("close")), 0)
- self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 1)
- self.checkNoPacketSent(conn)
- self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 0)
-
- # call with time < critical_time < t
- t = critical_time + 5
- self.assertTrue(t > critical_time)
- r = event(t)
- self.assertTrue(r)
- self.assertEquals(len(conn.mockGetNamedCalls("lock")), 2)
- self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 1)
- self.assertEquals(len(conn.mockGetNamedCalls("close")), 1)
- self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 2)
- self.checkNoPacketSent(conn)
- self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 1)
-
if __name__ == '__main__':
unittest.main()
More information about the Neo-report
mailing list