[Neo-report] r2453 vincent - in /trunk/neo: client/poll.py dispatcher.py

nobody at svn.erp5.org nobody at svn.erp5.org
Tue Nov 9 21:34:58 CET 2010


Author: vincent
Date: Tue Nov  9 21:34:58 2010
New Revision: 2453

Log:
Locking is required for a multithreaded start (showcased by ZODB testMT).

Move startup code to ThreadedPoll class.
Refcount-ish based would be better, so add a TODO (idea: browse existing
connections to see if there are pending responses, preventing thread
shutdown).

Modified:
    trunk/neo/client/poll.py
    trunk/neo/dispatcher.py

Modified: trunk/neo/client/poll.py
==============================================================================
--- trunk/neo/client/poll.py [iso-8859-1] (original)
+++ trunk/neo/client/poll.py [iso-8859-1] Tue Nov  9 21:34:58 2010
@@ -16,6 +16,7 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
 from threading import Thread, Event, enumerate as thread_enum
+from neo.locking import Lock
 import neo
 
 class _ThreadedPoll(Thread):
@@ -60,6 +61,9 @@ class ThreadedPoll(object):
     _started = False
 
     def __init__(self, *args, **kw):
+        lock = Lock()
+        self._status_lock_acquire = lock.acquire
+        self._status_lock_release = lock.release
         self._args = args
         self._kw = kw
         self.newThread()
@@ -68,11 +72,32 @@ class ThreadedPoll(object):
         self._thread = _ThreadedPoll(*self._args, **self._kw)
 
     def start(self):
-        if self._started:
-            self.newThread()
-        else:
-            self._started = True
-        self._thread.start()
+        """
+        Start thread if not started or restart it if it's shutting down.
+        """
+        # TODO: a refcount-based approach would be better, but more intrusive.
+        self._status_lock_acquire()
+        try:
+            thread = self._thread
+            if thread.stopping():
+                # XXX: ideally, we should wake thread up here, to be sure not
+                # to wait forever.
+                thread.join()
+            if not thread.isAlive():
+                if self._started:
+                    self.newThread()
+                else:
+                    self._started = True
+                self._thread.start()
+        finally:
+            self._status_lock_release()
+
+    def stop(self):
+        self._status_lock_acquire()
+        try:
+            self._thread.stop()
+        finally:
+            self._status_lock_release()
 
     def __getattr__(self, key):
         return getattr(self._thread, key)

Modified: trunk/neo/dispatcher.py
==============================================================================
--- trunk/neo/dispatcher.py [iso-8859-1] (original)
+++ trunk/neo/dispatcher.py [iso-8859-1] Tue Nov  9 21:34:58 2010
@@ -66,19 +66,7 @@ class Dispatcher:
         return True
 
     def needPollThread(self):
-        thread = self.poll_thread
-        # If thread has been stopped, wait for it to stop
-        # Note: This is not, ironically, thread safe: if one thread is
-        # stopping poll thread while we are checking its state here, a
-        # race condition will occur. If safety is required, locks should
-        # be added to control the access to thread's "start", "stopping"
-        # and "stop" methods.
-        if thread.stopping():
-            # XXX: ideally, we should wake thread up here, to be sure not
-            # to wait forever.
-            thread.join()
-        if not thread.isAlive():
-            thread.start()
+        self.poll_thread.start()
 
     @giant_lock
     @profiler_decorator





More information about the Neo-report mailing list