delayedresult.py
16.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
"""
This module supports the thread-safe, asynchronous transmission of data
('delayed results') from a worker (non-GUI) thread to the main thread. Ie you don't
need to mutex lock any data, the worker thread doesn't wait (or even check)
for the result to be received, and the main thread doesn't wait for the
worker thread to send the result. Instead, the consumer will be called
automatically by the wx app when the worker thread result is available.
In most cases you just need to use startWorker() with the correct parameters
(your worker function and your 'consumer' in the simplest of cases). The
only requirement on consumer is that it must accept a DelayedResult instance
as first arg.
In the following example, this will call consumer(delayedResult) with the
return value from workerFn::
from delayedresult import startWorker
startWorker(consumer, workerFn)
More advanced uses:
- The other parameters to startWorker()
- Derive from Producer to override _extraInfo (e.g. to provide traceback info)
- Create your own worker-function-thread wrapper instead of using Producer
- Create your own Handler-like wrapper to pre- or post-process the result
(see PreProcessChain)
- Derive from Sender to use your own way of making result hop over the
"thread boundary" (from non-main thread to main thread), e.g. using Queue
Thanks to Josiah Carlson for critical feedback/ideas that helped me
improve this module.
:Copyright: (c) 2006 by Oliver Schoenborn
:License: wxWidgets license
:Version: 1.0
"""
__author__ = 'Oliver Schoenborn at utoronto dot ca'
__version__ = '1.0'
__all__ = ('Sender', 'SenderNoWx', 'SenderWxEvent', 'SenderCallAfter',
'Handler', 'DelayedResult', 'Producer', 'startWorker', 'PreProcessChain')
import wx
import threading
import traceback
class Struct:
"""
An object that has attributes built from the dictionary given in
constructor. So ss=Struct(a=1, b='b') will satisfy assert ss.a == 1
and assert ss.b == 'b'.
"""
def __init__(self, **kwargs):
self.__dict__.update( kwargs )
class Handler:
"""
Bind some of the arguments and keyword arguments of a callable ('listener').
Then when the Handler instance is called (e.g. `handler(result, **kwargs)`)
the result is passed as first argument to callable, the kwargs is
combined with those given at construction, and the args are those
given at construction. Its return value is returned.
"""
def __init__(self, listener, *args, **kwargs ):
"""Bind args and kwargs to listener. """
self.__listener = listener
self.__args = args
self.__kwargs = kwargs
def __call__(self, result, **moreKwargs):
"""Listener is assumed to take result as first `arg`, then `*args`,
then the combination of moreKwargs and the kwargs given at construction."""
if moreKwargs:
moreKwargs.update(self.__kwargs)
else:
moreKwargs = self.__kwargs
return self.__listener(result, *self.__args, **moreKwargs)
class Sender:
"""
Base class for various kinds of senders. A sender sends a result
produced by a worker funtion to a result handler (listener). Note
that each sender can be given a "job id". This can be anything
(number, string, id, and object, etc) and is not used, it is
simply added as attribute whenever a DelayedResult is created.
This allows you to know, if desired, what result corresponds to
which sender. Note that uniqueness is not necessary.
Derive from this class if none of the existing derived classes
are adequate, and override _sendImpl().
"""
def __init__(self, jobID=None):
"""The optional jobID can be anything that you want to use to
track which sender particular results come from. """
self.__jobID = jobID
def getJobID(self):
"""Return the jobID given at construction"""
return self.__jobID
def sendResult(self, result):
"""This will send the result to handler, using whatever
technique the derived class uses. """
delayedResult = DelayedResult(result, jobID=self.__jobID)
self._sendImpl(delayedResult)
def sendException(self, exception, extraInfo = None, originalTb = None):
"""Use this when the worker function raised an exception.
The *exception* is the instance of Exception caught. The extraInfo
could be anything you want (e.g. locals or traceback etc),
it will be added to the exception as attribute 'extraInfo'. The
exception will be raised when DelayedResult.get() is called."""
assert exception is not None
delayedResult = DelayedResult(extraInfo,
exception=exception, jobID=self.__jobID, originalTb=originalTb)
self._sendImpl(delayedResult)
def _sendImpl(self, delayedResult):
msg = '_sendImpl() must be implemented in %s' % self.__class__
raise NotImplementedError(msg)
class SenderNoWx( Sender ):
"""
Sender that works without wx. The results are sent directly, ie
the consumer will get them "in the worker thread". So it should
only be used for testing.
"""
def __init__(self, consumer, jobID=None, args=(), kwargs={}):
"""The consumer can be any callable of the form
`callable(result, *args, **kwargs)`"""
Sender.__init__(self, jobID)
if args or kwargs:
self.__consumer = Handler(consumer, *args, **kwargs)
else:
self.__consumer = consumer
def _sendImpl(self, delayedResult):
self.__consumer(delayedResult)
class SenderWxEvent( Sender ):
"""
This sender sends the delayed result produced in the worker thread
to an event handler in the main thread, via a wx event of class
*eventClass*. The result is an attribute of the event (default:
"delayedResult".
"""
def __init__(self, handler, eventClass, resultAttr="delayedResult",
jobID=None, **kwargs):
"""The handler must derive from wx.EvtHandler. The event class
is typically the first item in the pair returned by
wx.lib.newevent.NewEvent(). You can use the *resultAttr*
to change the attribute name of the generated event's
delayed result. """
Sender.__init__(self, jobID)
if not isinstance(handler, wx.EvtHandler):
msg = 'SenderWxEvent(handler=%s, ...) not allowed,' % type(handler)
msg = '%s handler must derive from wx.EvtHandler' % msg
raise ValueError(msg)
self.__consumer = Struct(handler=handler, eventClass=eventClass,
resultAttr=resultAttr, kwargs=kwargs)
def _sendImpl(self, delayedResult):
"""Must not modify the consumer (that was created at construction)
since might be shared by several senders, each sending from
separate threads."""
consumer = self.__consumer
kwargs = consumer.kwargs.copy()
kwargs[ consumer.resultAttr ] = delayedResult
event = consumer.eventClass(** kwargs)
wx.PostEvent(consumer.handler, event)
class SenderCallAfter( Sender ):
"""
This sender sends the delayed result produced in the worker thread
to a callable in the main thread, via wx.CallAfter.
"""
def __init__(self, listener, jobID=None, args=(), kwargs={}):
Sender.__init__(self, jobID)
if args or kwargs:
self.__consumer = Handler(listener, *args, **kwargs)
else:
self.__consumer = listener
def _sendImpl(self, delayedResult):
wx.CallAfter(self.__consumer, delayedResult)
class DelayedResult:
"""
Represent the actual delayed result coming from the non-main thread.
An instance of this is given to the result handler. This result is
either a (reference to a) the value sent, or an exception.
If the latter, the exception is raised when the get() method gets
called.
"""
def __init__(self, result, jobID=None, exception = None, originalTb = None):
"""You should never have to call this yourself. A DelayedResult
is created by a concrete Sender for you."""
self.__result = result
self.__exception = exception
self.__original_traceback = originalTb
self.__jobID = jobID
def getJobID(self):
"""Return the jobID given when Sender initialized,
or None if none given. """
return self.__jobID
def get(self):
"""Get the result. If an exception was sent instead of a result,
(via Sender's sendExcept()), that **exception is raised**, and
the original traceback is available as the 'originalTraceback'
variable in the exception object.
Otherwise, the result is simply returned.
"""
if self.__exception: # exception was raised!
self.__exception.extraInfo = self.__result
self.__exception.originalTraceback = self.__original_traceback
raise self.__exception
return self.__result
class AbortedException(Exception):
"""Raise this in your worker function so that the sender knows
not to send a result to handler."""
pass
class Producer(threading.Thread):
"""
Represent the worker thread that produces delayed results.
It causes the given function to run in a separate thread,
and a sender to be used to send the return value of the function.
As with any threading.Thread, instantiate and call start().
Note that if the workerFn raises AbortedException, the result is not
sent and the thread terminates gracefully.
"""
def __init__(self, sender, workerFn, args=(), kwargs={},
name=None, group=None, daemon=False,
sendReturn=True, senderArg=None):
"""The sender will send the return value of
`workerFn(*args, **kwargs)` to the main thread. The name and group
are same as threading.Thread constructor parameters. Daemon causes
setDaemon() to be called. If sendReturn is False, then the return
value of workerFn() will not be sent. If senderArg is given, it
must be the name of the keyword arg to use to pass the sender into
the workerFn, so the function can send (typically many) results."""
if senderArg:
kwargs[senderArg] = sender
def wrapper():
try:
result = workerFn(*args, **kwargs)
except AbortedException:
pass
except Exception, exc:
originalTb = traceback.format_exc()
extraInfo = self._extraInfo(exc)
sender.sendException(exc, extraInfo, originalTb)
else:
if sendReturn:
sender.sendResult(result)
threading.Thread.__init__(self, name=name, group=group, target=wrapper)
if daemon:
self.setDaemon(daemon)
def _extraInfo(self, exception):
"""This method could be overridden in a derived class to provide
extra information when an exception is being sent instead of a
result. """
return None
class AbortEvent:
"""
Convenience class that represents a kind of threading.Event that
raises AbortedException when called (see the __call__ method, everything
else is just to make it look like threading.Event).
"""
def __init__(self):
self.__ev = threading.Event()
def __call__(self, timeout=None):
"""See if event has been set (wait at most timeout if given). If so,
raise AbortedException. Otherwise return None. Allows you to do
'while not event():' which will always succeed unless the event
has been set (then AbortedException will cause while to exit)."""
if timeout:
self.__ev.wait(timeout)
if self.__ev.isSet():
raise AbortedException()
return None
def __getattr__(self, name):
"""This allows us to be a kind of threading.Event."""
if name in ('set','clear','wait','isSet'):
return getattr(self.__ev, name)
def startWorker(
consumer, workerFn,
cargs=(), ckwargs={},
wargs=(), wkwargs={},
jobID=None, group=None, daemon=False,
sendReturn=True, senderArg=None):
"""
Convenience function to send data produced by `workerFn(*wargs, **wkwargs)`
running in separate thread, to a `consumer(*cargs, **ckwargs)` running in
the main thread. This function merely creates a SenderCallAfter (or a
SenderWxEvent, if consumer derives from wx.EvtHandler), and a Producer,
and returns immediately after starting the Producer thread. The jobID
is used for the Sender and as name for the Producer thread. Returns the
thread created, in case caller needs join/etc.
"""
if isinstance(consumer, wx.EvtHandler):
eventClass = cargs[0]
sender = SenderWxEvent(consumer, eventClass, jobID=jobID, **ckwargs)
else:
sender = SenderCallAfter(consumer, jobID, args=cargs, kwargs=ckwargs)
thread = Producer(
sender, workerFn, args=wargs, kwargs=wkwargs,
name=jobID, group=group, daemon=daemon,
senderArg=senderArg, sendReturn=sendReturn)
thread.start()
return thread
class PreProcessChain:
"""
Represent a 'delayed result pre-processing chain', a kind of Handler.
Useful when lower-level objects need to apply a sequence of transformations
to the delayed result before handing it over to a final handler.
This allows the starter of the worker function to not know
anything about the lower-level objects.
"""
def __init__(self, handler, *args, **kwargs):
"""Wrap `handler(result, *args, **kwargs)` so that the result
it receives has been transformed by us. """
if handler is None:# assume rhs is a chain
self.__chain = args[0]
else:
if args or kwargs:
handler = Handler(handler, *args, **kwargs)
self.__chain = [handler]
def addSub(self, callable, *args, **kwargs):
"""Add a sub-callable, ie a `callable(result, *args, **kwargs)`
that returns a transformed result to the previously added
sub-callable (or the handler given at construction, if this is
the first call to addSub). """
self.__chain.append( Handler(callable, *args, **kwargs) )
def clone(self):
"""Clone the chain. Shallow only. Useful when several threads
must be started but have different sub-callables. """
return PreProcessChain(None, self.__chain[:] )
def cloneAddSub(self, callable, *args, **kwargs):
"""Convenience method that first clones self, then calls addSub()
on that clone with given arguments. """
cc = self.clone()
cc.addSub(callable, *args, **kwargs)
def count(self):
"""How many pre-processors in the chain"""
return len(self.__chain)
class Traverser:
"""
Traverses the chain of pre-processors it is given, transforming
the original delayedResult along the way. The return value of each
callable added via addSub() is given to the previous addSub() callable,
until the handler is reached.
"""
def __init__(self, delayedResult, chain):
self.__dr = delayedResult
self.__chain = chain
def get(self):
"""This makes handler think we are a delayedResult."""
if not self.__chain:
return self.__dr.get()
handler = self.__chain[0]
del self.__chain[0]
return handler(self)
def getJobID(self):
"""Return the job id for the delayedResult we transform."""
return self.__dr.getJobID()
def __call__(self, delayedResult):
"""This makes us a Handler. We just call handler(Traverser). The
handler will think it is getting a delayed result, but in fact
will be getting an instance of Traverser, which will take care
of properly applying the chain of transformations to delayedResult."""
chainTrav = self.Traverser(delayedResult, self.__chain[1:])
handler = self.__chain[0]
handler( chainTrav )