delayedresult.py 16.2 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
"""
This module supports the thread-safe, asynchronous transmission of data 
('delayed results') from a worker (non-GUI) thread to the main thread. Ie you don't 
need to mutex lock any data, the worker thread doesn't wait (or even check) 
for the result to be received, and the main thread doesn't wait for the 
worker thread to send the result. Instead, the consumer will be called 
automatically by the wx app when the worker thread result is available. 

In most cases you just need to use startWorker() with the correct parameters
(your worker function and your 'consumer' in the simplest of cases). The 
only requirement on consumer is that it must accept a DelayedResult instance 
as first arg. 

In the following example, this will call consumer(delayedResult) with the 
return value from workerFn::

    from delayedresult import startWorker
    startWorker(consumer, workerFn)

More advanced uses: 

- The other parameters to startWorker()
- Derive from Producer to override _extraInfo (e.g. to provide traceback info)
- Create your own worker-function-thread wrapper instead of using Producer
- Create your own Handler-like wrapper to pre- or post-process the result 
  (see PreProcessChain)
- Derive from Sender to use your own way of making result hop over the 
  "thread boundary" (from non-main thread to main thread), e.g. using Queue

Thanks to Josiah Carlson for critical feedback/ideas that helped me 
improve this module. 

:Copyright: (c) 2006 by Oliver Schoenborn
:License: wxWidgets license
:Version: 1.0

"""

__author__  = 'Oliver Schoenborn at utoronto dot ca'
__version__ = '1.0'

__all__ = ('Sender', 'SenderNoWx', 'SenderWxEvent', 'SenderCallAfter', 
    'Handler', 'DelayedResult', 'Producer', 'startWorker', 'PreProcessChain')


import wx
import threading
import traceback


class Struct:
    """
    An object that has attributes built from the dictionary given in 
    constructor. So ss=Struct(a=1, b='b') will satisfy assert ss.a == 1
    and assert ss.b == 'b'.
    """
    
    def __init__(self, **kwargs):
        self.__dict__.update( kwargs )


class Handler:
    """
    Bind some of the arguments and keyword arguments of a callable ('listener'). 
    Then when the Handler instance is called (e.g. `handler(result, **kwargs)`)
    the result is passed as first argument to callable, the kwargs is 
    combined with those given at construction, and the args are those
    given at construction. Its return value is returned.
    """
    def __init__(self, listener, *args, **kwargs ):
        """Bind args and kwargs to listener. """
        self.__listener = listener
        self.__args = args
        self.__kwargs = kwargs
        
    def __call__(self, result, **moreKwargs):
        """Listener is assumed to take result as first `arg`, then `*args`, 
        then the combination of moreKwargs and the kwargs given at construction."""
        if moreKwargs:
            moreKwargs.update(self.__kwargs)
        else:
            moreKwargs = self.__kwargs
        return self.__listener(result, *self.__args, **moreKwargs)

        
class Sender:
    """
    Base class for various kinds of senders. A sender sends a result
    produced by a worker funtion to a result handler (listener). Note
    that each sender can be given a "job id". This can be anything
    (number, string, id, and object, etc) and is not used, it is 
    simply added as attribute whenever a DelayedResult is created. 
    This allows you to know, if desired, what result corresponds to 
    which sender. Note that uniqueness is not necessary. 
    
    Derive from this class if none of the existing derived classes
    are adequate, and override _sendImpl(). 
    """
    
    def __init__(self, jobID=None):
        """The optional jobID can be anything that you want to use to 
        track which sender particular results come from. """
        self.__jobID = jobID

    def getJobID(self):
        """Return the jobID given at construction"""
        return self.__jobID
    
    def sendResult(self, result):
        """This will send the result to handler, using whatever 
        technique the derived class uses. """
        delayedResult = DelayedResult(result, jobID=self.__jobID)
        self._sendImpl(delayedResult)

    def sendException(self, exception, extraInfo = None, originalTb = None):
        """Use this when the worker function raised an exception.
        The *exception* is the instance of Exception caught. The extraInfo
        could be anything you want (e.g. locals or traceback etc), 
        it will be added to the exception as attribute 'extraInfo'. The
        exception will be raised when DelayedResult.get() is called."""
        assert exception is not None
        delayedResult = DelayedResult(extraInfo, 
            exception=exception, jobID=self.__jobID, originalTb=originalTb)
        self._sendImpl(delayedResult)

    def _sendImpl(self, delayedResult):
        msg = '_sendImpl() must be implemented in %s' % self.__class__
        raise NotImplementedError(msg)
        
    
class SenderNoWx( Sender ):
    """
    Sender that works without wx. The results are sent directly, ie
    the consumer will get them "in the worker thread". So it should 
    only be used for testing. 
    """
    def __init__(self, consumer, jobID=None, args=(), kwargs={}):
        """The consumer can be any callable of the form 
        `callable(result, *args, **kwargs)`"""
        Sender.__init__(self, jobID)
        if args or kwargs:
            self.__consumer = Handler(consumer, *args, **kwargs)
        else: 
            self.__consumer = consumer
            
    def _sendImpl(self, delayedResult):
        self.__consumer(delayedResult)
        

class SenderWxEvent( Sender ):
    """
    This sender sends the delayed result produced in the worker thread
    to an event handler in the main thread, via a wx event of class 
    *eventClass*. The result is an attribute of the event (default: 
    "delayedResult". 
    """
    def __init__(self, handler, eventClass, resultAttr="delayedResult", 
        jobID=None, **kwargs):
        """The handler must derive from wx.EvtHandler. The event class 
        is typically the first item in the pair returned by 
        wx.lib.newevent.NewEvent(). You can use the *resultAttr* 
        to change the attribute name of the generated event's 
        delayed result. """
        Sender.__init__(self, jobID)
        if not isinstance(handler, wx.EvtHandler):
            msg = 'SenderWxEvent(handler=%s, ...) not allowed,' % type(handler)
            msg = '%s handler must derive from wx.EvtHandler' % msg
            raise ValueError(msg)
        self.__consumer = Struct(handler=handler, eventClass=eventClass, 
                                 resultAttr=resultAttr, kwargs=kwargs)
        
    def _sendImpl(self, delayedResult):
        """Must not modify the consumer (that was created at construction) 
        since might be shared by several senders, each sending from 
        separate threads."""
        consumer = self.__consumer
        kwargs = consumer.kwargs.copy()
        kwargs[ consumer.resultAttr ] = delayedResult
        event = consumer.eventClass(** kwargs)
        wx.PostEvent(consumer.handler, event)


class SenderCallAfter( Sender ):
    """
    This sender sends the delayed result produced in the worker thread
    to a callable in the main thread, via wx.CallAfter. 
    """
    def __init__(self, listener, jobID=None, args=(), kwargs={}):
        Sender.__init__(self, jobID)
        if args or kwargs:
            self.__consumer = Handler(listener, *args, **kwargs)
        else:
            self.__consumer = listener
            
    def _sendImpl(self, delayedResult):
        wx.CallAfter(self.__consumer, delayedResult)
        

class DelayedResult:
    """
    Represent the actual delayed result coming from the non-main thread. 
    An instance of this is given to the result handler. This result is 
    either a (reference to a) the value sent, or an exception. 
    If the latter, the exception is raised when the get() method gets
    called. 
    """
    
    def __init__(self, result, jobID=None, exception = None, originalTb = None):
        """You should never have to call this yourself. A DelayedResult 
        is created by a concrete Sender for you."""
        self.__result = result
        self.__exception = exception
        self.__original_traceback = originalTb
        self.__jobID = jobID

    def getJobID(self):
        """Return the jobID given when Sender initialized, 
        or None if none given. """
        return self.__jobID 
    
    def get(self):
        """Get the result. If an exception was sent instead of a result, 
        (via Sender's sendExcept()), that **exception is raised**, and
        the original traceback is available as the 'originalTraceback'
        variable in the exception object.

        Otherwise, the result is simply returned. 
        """
        if self.__exception: # exception was raised!
            self.__exception.extraInfo = self.__result
            self.__exception.originalTraceback = self.__original_traceback
            raise self.__exception
        
        return self.__result


class AbortedException(Exception):
    """Raise this in your worker function so that the sender knows 
    not to send a result to handler."""
    pass
    

class Producer(threading.Thread):
    """
    Represent the worker thread that produces delayed results. 
    It causes the given function to run in a separate thread, 
    and a sender to be used to send the return value of the function.
    As with any threading.Thread, instantiate and call start().
    Note that if the workerFn raises AbortedException, the result is not 
    sent and the thread terminates gracefully.
    """
    
    def __init__(self, sender, workerFn, args=(), kwargs={}, 
                 name=None, group=None, daemon=False, 
                 sendReturn=True, senderArg=None):
        """The sender will send the return value of 
        `workerFn(*args, **kwargs)` to the main thread. The name and group 
        are same as threading.Thread constructor parameters. Daemon causes 
        setDaemon() to be called. If sendReturn is False, then the return 
        value of workerFn() will not be sent. If senderArg is given, it 
        must be the name of the keyword arg to use to pass the sender into 
        the workerFn, so the function can send (typically many) results."""
        if senderArg:
            kwargs[senderArg] = sender
        def wrapper():
            try: 
                result = workerFn(*args, **kwargs)
            except AbortedException:
                pass
            except Exception, exc:
                originalTb = traceback.format_exc()
                extraInfo = self._extraInfo(exc)
                sender.sendException(exc, extraInfo, originalTb)
            else:
                if sendReturn:
                    sender.sendResult(result)
            
        threading.Thread.__init__(self, name=name, group=group, target=wrapper)
        if daemon:
            self.setDaemon(daemon)
    
    def _extraInfo(self, exception):
        """This method could be overridden in a derived class to provide 
        extra information when an exception is being sent instead of a 
        result. """
        return None


class AbortEvent:
    """
    Convenience class that represents a kind of threading.Event that
    raises AbortedException when called (see the __call__ method, everything
    else is just to make it look like threading.Event).
    """
    
    def __init__(self):
        self.__ev = threading.Event()

    def __call__(self, timeout=None):
        """See if event has been set (wait at most timeout if given).  If so, 
        raise AbortedException. Otherwise return None. Allows you to do
        'while not event():' which will always succeed unless the event 
        has been set (then AbortedException will cause while to exit)."""
        if timeout:
            self.__ev.wait(timeout)
        if self.__ev.isSet():
            raise AbortedException()
        return None
    
    def __getattr__(self, name):
        """This allows us to be a kind of threading.Event."""
        if name in ('set','clear','wait','isSet'):
            return getattr(self.__ev, name)


def startWorker(
    consumer, workerFn, 
    cargs=(), ckwargs={}, 
    wargs=(), wkwargs={},
    jobID=None, group=None, daemon=False, 
    sendReturn=True, senderArg=None):
    """
    Convenience function to send data produced by `workerFn(*wargs, **wkwargs)`
    running in separate thread, to a `consumer(*cargs, **ckwargs)` running in
    the main thread. This function merely creates a SenderCallAfter (or a
    SenderWxEvent, if consumer derives from wx.EvtHandler), and a Producer,
    and returns immediately after starting the Producer thread. The jobID
    is used for the Sender and as name for the Producer thread. Returns the 
    thread created, in case caller needs join/etc.
    """
    
    if isinstance(consumer, wx.EvtHandler):
        eventClass = cargs[0]
        sender = SenderWxEvent(consumer, eventClass, jobID=jobID, **ckwargs)
    else:
        sender = SenderCallAfter(consumer, jobID, args=cargs, kwargs=ckwargs)
        
    thread = Producer(
        sender, workerFn, args=wargs, kwargs=wkwargs, 
        name=jobID, group=group, daemon=daemon, 
        senderArg=senderArg, sendReturn=sendReturn)
        
    thread.start() 
    return thread


class PreProcessChain:
    """
    Represent a 'delayed result pre-processing chain', a kind of Handler. 
    Useful when lower-level objects need to apply a sequence of transformations 
    to the delayed result before handing it over to a final handler. 
    This allows the starter of the worker function to not know 
    anything about the lower-level objects. 
    """
    def __init__(self, handler, *args, **kwargs):
        """Wrap `handler(result, *args, **kwargs)` so that the result 
        it receives has been transformed by us. """
        if handler is None:# assume rhs is a chain
            self.__chain = args[0]
        else:
            if args or kwargs:
                handler = Handler(handler, *args, **kwargs)
            self.__chain = [handler]

    def addSub(self, callable, *args, **kwargs):
        """Add a sub-callable, ie a `callable(result, *args, **kwargs)`
        that returns a transformed result to the previously added
        sub-callable (or the handler given at construction, if this is 
        the first call to addSub). """
        self.__chain.append( Handler(callable, *args, **kwargs) )
        
    def clone(self):
        """Clone the chain. Shallow only. Useful when several threads 
        must be started but have different sub-callables. """
        return PreProcessChain(None, self.__chain[:] )
    
    def cloneAddSub(self, callable, *args, **kwargs):
        """Convenience method that first clones self, then calls addSub() 
        on that clone with given arguments. """
        cc = self.clone()
        cc.addSub(callable, *args, **kwargs)
        
    def count(self):
        """How many pre-processors in the chain"""
        return len(self.__chain)
    
    class Traverser:
        """
        Traverses the chain of pre-processors it is given, transforming
        the original delayedResult along the way. The return value of each 
        callable added via addSub() is given to the previous addSub() callable,
        until the handler is reached. 
        """
        def __init__(self, delayedResult, chain):
            self.__dr = delayedResult
            self.__chain = chain
            
        def get(self):
            """This makes handler think we are a delayedResult."""
            if not self.__chain:
                return self.__dr.get()
            
            handler = self.__chain[0]
            del self.__chain[0]
            return handler(self)
        
        def getJobID(self):
            """Return the job id for the delayedResult we transform."""
            return self.__dr.getJobID()


    def __call__(self, delayedResult):
        """This makes us a Handler. We just call handler(Traverser). The
        handler will think it is getting a delayed result, but in fact 
        will be getting an instance of Traverser, which will take care
        of properly applying the chain of transformations to delayedResult."""
        chainTrav = self.Traverser(delayedResult, self.__chain[1:])
        handler = self.__chain[0]
        handler( chainTrav )