root/NXLucene/trunk/src/nxlucene/server/threadpool.py

Revision 51815, 6.5 kB (checked in by rspivak, 2 years ago)

Merged pylucene2 branch changes r51513:51814 into the trunk.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
Line 
1 # Copyright (C) 2006, Nuxeo SAS <http://www.nuxeo.com>
2 # Author: Julien Anguenot <ja@nuxeo.com>
3 #
4 # This library is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU Lesser General Public
6 # License as published by the Free Software Foundation; either
7 # version 2.1 of the License, or (at your option) any later version.
8 #
9 # This library is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 # Lesser General Public License for more details.
13 #
14 # You should have received a copy of the GNU Lesser General Public
15 # License along with this library; if not, write to the Free Software
16 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-13
17 """A generic programming PyLucene thread pool
18
19 Highly inspired from :
20 http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/203871
21
22 Thanks Tim Lesher
23
24 $Id$
25 """
26 import gc
27 from time import sleep
28 import threading
29 import logging
30 from PyLucene import PythonThread
31
32 logger = logging.getLogger("NXLucene.ThreadPool")
33
34 class ThreadPool:
35
36     """Flexible thread pool class.  Creates a pool of threads, then
37     accepts tasks that will be dispatched to the next available
38     thread."""
39
40     def __init__(self, numThreads):
41
42         """Initialize the thread pool with numThreads workers."""
43
44         self.__threads = []
45         self.__resizeLock = threading.Condition(threading.Lock())
46         self.__taskLock = threading.Condition(threading.Lock())
47         self.__tasks = []
48         self.__isJoining = False
49         self.setThreadCount(numThreads)
50
51     def setThreadCount(self, newNumThreads):
52
53         """ External method to set the current pool size.  Acquires
54         the resizing lock, then calls the internal version to do real
55         work."""
56
57         # Can't change the thread count if we're shutting down the pool!
58         if self.__isJoining:
59             return False
60
61         self.__resizeLock.acquire()
62         try:
63             self.__setThreadCountNolock(newNumThreads)
64         finally:
65             self.__resizeLock.release()
66         return True
67
68     def __setThreadCountNolock(self, newNumThreads):
69
70         """Set the current pool size, spawning or terminating threads
71         if necessary.  Internal use only; assumes the resizing lock is
72         held."""
73
74         # If we need to grow the pool, do so
75         while newNumThreads > len(self.__threads):
76             newThread = ThreadPoolThread(self)
77             self.__threads.append(newThread)
78             newThread.start()
79         # If we need to shrink the pool, do so
80         while newNumThreads < len(self.__threads):
81             self.__threads[0].goAway()
82             del self.__threads[0]
83
84     def getThreadCount(self):
85
86         """Return the number of threads in the pool."""
87
88         self.__resizeLock.acquire()
89         try:
90             return len(self.__threads)
91         finally:
92             self.__resizeLock.release()
93
94     def queueTask(self, task, args=None, taskCallback=None):
95
96         """Insert a task into the queue.  task must be callable;
97         args and taskCallback can be None."""
98
99         logger.debug("ThreadPool -> gc.garbage : %s" % str(gc.garbage))
100
101         if self.__isJoining == True:
102             return False
103         if not callable(task):
104             return False
105
106         self.__taskLock.acquire()
107         try:
108             self.__tasks.append((task, args, taskCallback))
109             return True
110         finally:
111             self.__taskLock.release()
112
113     def getNextTask(self):
114
115         """ Retrieve the next task from the task queue.  For use
116         only by ThreadPoolThread objects contained in the pool."""
117
118         self.__taskLock.acquire()
119         try:
120             if self.__tasks == []:
121                 return (None, None, None)
122             else:
123                 return self.__tasks.pop(0)
124         finally:
125             self.__taskLock.release()
126
127     def joinAll(self, waitForTasks = True, waitForThreads = True):
128
129         """ Clear the task queue and terminate all pooled threads,
130         optionally allowing the tasks and threads to finish."""
131
132         # Mark the pool as joining to prevent any more task queueing
133         self.__isJoining = True
134
135         # Wait for tasks to finish
136         if waitForTasks:
137             while self.__tasks != []:
138                 sleep(0.1)
139
140         # Tell all the threads to quit
141         self.__resizeLock.acquire()
142         try:
143             # Wait until all threads have exited
144             if waitForThreads:
145                 for t in self.__threads:
146                     t.goAway()
147                 for t in self.__threads:
148                     t.join()
149                     # print t,"joined"
150                     del t
151             self.__setThreadCountNolock(0)
152             self.__isJoining = True
153
154             # Reset the pool for potential reuse
155             self.__isJoining = False
156         finally:
157             self.__resizeLock.release()
158
159 class ThreadPoolThread(PythonThread):
160
161     """ Pooled thread class. """
162
163     threadSleepTime = 0.1
164
165     def __init__(self, pool):
166
167         """ Initialize the thread and remember the pool. """
168
169         PythonThread.__init__(self)
170         self.__pool = pool
171         self.__isDying = False
172
173     def run(self):
174
175         """ Until told to quit, retrieve the next task and execute
176         it, calling the callback if any.  """
177
178         while self.__isDying == False:
179             cmd, args, callback = self.__pool.getNextTask()
180             # If there's nothing to do, just sleep a bit
181             if cmd is None:
182                 sleep(ThreadPoolThread.threadSleepTime)
183             elif callback is None:
184                 logger.info(
185                     "Worker thread %s starts execution ...." % self.getName())
186                 cmd(*args)
187                 gc.collect()
188                 del gc.garbage[:]
189                 logger.debug("gc.garbage __len__ ..........................."
190                              " %s" %str(len(gc.garbage)))
191                 logger.info(
192                     "Worker thread %s execution is done ...." % self.getName())
193             else:
194                 logger.info(
195                     "Worker thread %s starts execution ...." % self.getName())
196                 callback(cmd(*args))
197                 gc.collect()
198                 del gc.garbage[:]
199                 logger.debug("gc.garbage __len__ ..........................."
200                              " %s" %str(len(gc.garbage)))
201                 logger.info(
202                     "Worker thread %s execution is done ...." % self.getName())
203
204     def goAway(self):
205
206         """ Exit the run loop next time through."""
207
208         self.__isDying = True
Note: See TracBrowser for help on using the browser.