_threads.py
3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
###############################################################################
# Name: _threads.py #
# Purpose: Threadpool implementation #
# Author: Cody Precord <cprecord@editra.org> #
# Copyright: (c) 2011 Cody Precord <staff@editra.org> #
# Licence: wxWindows Licence #
###############################################################################
"""
Editra Business Model Library: ThreadPool
"""
__author__ = "Cody Precord <cprecord@editra.org>"
__svnid__ = "$Id: _threads.py 67422 2011-04-09 17:23:27Z CJP $"
__revision__ = "$Revision: 67422 $"
__all__ = [ 'ThreadPool', ]
#-----------------------------------------------------------------------------#
# Imports
import threading
import Queue
#-----------------------------------------------------------------------------#
class ThreadPool(object):
"""Object for managing a collection of threads and dispatching jobs
to them.
"""
def __init__(self, tcount, qsize=-1):
"""Create the ThreadPool
@param tcount: max number of threads to keep in the pool
@keyword qsize: size of job queue (-1 for unlimited)
"""
super(ThreadPool, self).__init__()
# Attributes
self._poolsize = tcount
self._jobs = Queue.Queue(qsize)
self._threads = [ _WorkerThread(self._jobs) for t in range(self._poolsize) ]
ThreadCount = property(lambda self: self._poolsize)
JobCount = property(lambda self: self._jobs.qsize())
def QueueJob(self, funct, *args, **kwargs):
"""Add a job to be processed
@param funct: callable
@param args: list of any positional arguments to funct
@param kwargs: map of any keyword arguments to funct
"""
assert callable(funct)
self._jobs.put((funct, args, kwargs))
def Shutdown(self):
"""Shutdown the ThreadPool
@note: Blocking call until all threads have exited
"""
self._jobs.join()
#-----------------------------------------------------------------------------#
class _WorkerThread(threading.Thread):
"""Worker thread class to be used by the ThreadPool"""
def __init__(self, jobs):
"""Create the Thread object
@param jobs: Queue object
"""
super(_WorkerThread, self).__init__()
# Attributes
self._jobs = jobs
self.daemon = True
self.start()
def run(self):
"""Run and process jobs until requested to exit"""
while True:
funct, args, kwargs = self._jobs.get()
try:
funct(*args, **kwargs)
except Exception, msg:
pass # TODO add error to result data?
finally:
self._jobs.task_done()
#-----------------------------------------------------------------------------#
# Unittest
if __name__ == '__main__':
pool = ThreadPool(5)
import time
import random
def Job(id_, length):
print "JOB: %d, begin" % id_
time.sleep(length)
print "JOB: %d, end" % id_
print "Start Jobs"
for x in range(8):
pool.QueueJob(Job, x, random.randint(1, 20))
print "All Jobs Queued"
pool.Shutdown() # blocks till pool is shutdown
print "All Done!"