aboutsummaryrefslogtreecommitdiff
blob: a09f35beb935cedc776f63e935e85c1777615dfb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# R overlay -- roverlay package, errorqueue
# -*- coding: utf-8 -*-
# Copyright (C) 2012 André Erdmann <dywi@mailerd.de>
# Distributed under the terms of the GNU General Public License;
# either version 2 of the License, or (at your option) any later version.

"""an error queue for safely stopping threads and unblocking queues"""

__all__ = [ 'ErrorQueue', ]

import threading

class ErrorQueue ( object  ):
   """This is the error queue for threaded execution."""
   # (it's not a queue)

   def __init__ ( self, using_threads=True ):
      self.using_threads      = using_threads
      self.empty              = True
      self._exceptions        = list()
      #  id -> queue, unblocking_item
      self._queues_to_unblock = dict()

      self._lock = threading.Lock()

   def really_empty ( self ):
      """Returns true if no exception stored. Uses a lock to ensure
      correctness of the result.
      """
      with self._lock:
         self.empty = len ( self._exceptions ) == 0
      return self.empty

   def _unblock_queues ( self ):
      """Sends an unblock item to all attached queues."""
      for q, v in self._queues_to_unblock.values():
         try:
            q.put_nowait ( v )
         except:
            pass

   def push ( self, context, error ):
      """Pushes an exception. This also triggers on-error mode, which
      unblock all attached queues.

      arguments:
      * context -- the origin of the exception
      * error   -- the exception
      """
      with self._lock:
         self._exceptions.append ( ( context, error ) )
         self.empty = False
         self._unblock_queues()

      if not self.using_threads: raise error
   def unblock_queues ( self ):
      """Unblocks all attached queues."""
      with self._lock:
         self._unblock_queues()

   def attach_queue ( self, q, unblock_item ):
      """Attaches a queue. Nothing will be done with it, unless an exception
      is pushed to this ErrorQueue, in which case all attached queues will
      be unblocked which allows queue-waiting threads to end.

      arguments:
      * q            -- queue
      * unblock_item -- item that is used for unblocking, e.g. 'None'
      """
      with self._lock:
         self._queues_to_unblock [id (q)] = ( q, unblock_item )

   def remove_queue ( self, q ):
      """Removes a queue. It will no longer receive an unblock item if
      on error mode.

      arguments:
      * q -- queue to remove
      """
      self._lock.acquire()
      try:
         del self._queues_to_unblock [id (q)]
      except KeyError:
         pass
      finally:
         self._lock.release()

   def peek ( self ):
      """Returns the latest pushed exception."""
      return self._exceptions [-1]

   def get_all ( self ):
      """Returns all pushed exceptions."""
      # not copying, caller shouldn't modify the exception list
      return self._exceptions

   def get_exceptions ( self ):
      """Similar to get_all, but a generator that filters out no-op exception
      pushes that are used to trigger on-error mode without a valid exception.
      """
      for e in self._exceptions:
         if isinstance ( e [1], (
            ImportError, SystemError, RuntimeError,
            Exception, KeyboardInterrupt
         ) ):
            yield e