Tasks provide a way of encapsulating and managing asynchronous actions in a composable manner. They are primarily used to perform some sort of I/O operation and/or manage a series of sub-tasks. Beam provides several classes to define your own tasks as well as classes that connect tasks to one another.
The task API in Beam is defined by two base classes, the class Beam::Tasks::Task which represents a task, and the class Beam::Tasks::TaskFactory which constructs tasks. In this article we will explore how we can use these two classes in ways that allow us to start, stop, modify and resume asynchronous actions.
The Task class defines two methods, an Execute() method to start the task and a Cancel() method to request that the task stop. Both operations are asynchronous, meaning that they return immediately and the actual operation of the task takes place in a separate thread of execution. In order to keep track of the progress of a task one must monitor its state which can be any of the following:
|NONE||The task has not yet been executed.|
|INITIALIZING||The task is performing some initialization, during this state no sub-tasks may be executed.|
|ACTIVE||The task is running, during this state sub-tasks may be executed.|
|PENDING_CANCEL||A request to cancel the task has been made, no new sub-tasks may be executed.|
|CANCELED||The task was canceled.|
|FAILED||The task could not complete due to an error.|
|EXPIRED||The task could not complete due to a time constraint.|
|COMPLETE||The task completed successfully.|
The states CANCELED, FAILED, EXPIRED and COMPLETE all represent terminal states. Before a task can transition to a terminal state, all of its sub tasks must be in a terminal state. Furthermore once a task is in a terminal state the task is not permitted to perform any additional action or transition to a different state.
The current state of the task along with all of its transitions is accessed through its Publisher by calling the GetPublisher() method. The publisher will emit objects of type Beam::Tasks::Task::StateEntry which contains two fields as follows:
|m_state||Beam::Tasks::Task::State||The state of the task.|
|m_message||std::string||A message describing the reason for the transition.|
In effect a task begins in the NONE state, is then started via the Execute() method, transitions into the INITIALIZATION state where it performs any setup needed, then proceeds to the ACTIVE state where it performs its operation including executing any required sub-tasks, and then finally terminates all of its sub-tasks and performs any final clean-up before ending in a terminal state. During any point after the INITIALIZATION state it may encounter a cancel request (via the Cancel() method) or an error, either of which should result in cleaning-up and terminating its sub-tasks before transitioning to the FAILED or CANCELED state.
The task factory is responsible for creating new tasks by invoking its Create() method. In addition to this a task factory also keeps track of the parameters and state needed to create tasks. Setting a task's parameter is done by calling the factory's Set(name, value) method and retrieving a parameter is done via the Get(name) method.
One additional operation that task factories perform is constructing what is called a continuation task. Continuations are a mechanism that allow us to resume a task that previously terminated, but more than that they also allow us to modify the parameters of that task. In a sense, a continuation lets us take a terminated task, modify it, and then continue running it with those modifications.
To do this, task factories have a method PrepareContinuation(task). To use it you pass into it a task that has terminated, then you modify its properties using the Set(name, value) method, and then finally you invoke the Create() method to get a newly constructed task that represents the continuation of the old task.
Not all tasks support continuations, those that do not will throw a Beam::NotSupportedException.
As a great deal of care must be taken to ensure that tasks properly transition from State to State, including managing sub-tasks, handling cancel requests etc... Beam provides the Beam::Tasks::BasicTask as a base class which can be inherited from to take care of much of the work needed to write a proper task. For example it ensures that upon a cancel request that the task transitions into the PENDING_CANCEL state and terminates all managed sub-tasks.
To make use of a the BasicTask you need only implement the OnExecute() method to start your task, and the OnCancel() method to handle cancelling your class.
To showcase a simple example, let's build a task that prints "hello" every second a given number of times. We will write a HelloTask class which inherits from BasicTask, and a HelloTaskFactory.
1 import datetime 2 import time 3 4 import beam 5 6 class HelloTask(beam.tasks.BasicTask): 7 '''Prints hello a specified number of times.''' 8 9 def __init__(self, timer, iterations): 10 ''' Constructs this task. 11 :param timer: The Timer used to wait between successive prints. 12 :param iterations: The number of times to print. 13 ''' 14 15 # We must make sure to initialize the base class. 16 beam.tasks.BasicTask.__init__(self) 17 self.timer = timer 18 self.iterations = iterations 19 20 # The number of times we've printed so far. 21 self.counter = 0 22 23 # Used to handle timer callbacks. 24 self.tasks = beam.RoutineTaskQueue() 25 26 # This overrides BasicTask.on_execute. 27 # When called we can assume that our Task is in the INITIALIZATION state. 28 def on_execute(self): 29 30 # To initialize our task we will monitor our timer and then start the timer. 31 self.timer.get_publisher().monitor(self.tasks.get_slot(self.on_timer)) 32 self.timer.start() 33 34 # Once the initialization is complete we transition to the ACTIVE state. 35 self.set_active() 36 37 # This overrides BasicTask.on_cancel. 38 # When called we can assume that our Task is in the PENDING_CANCEL state. 39 # No new sub-tasks may be executed. 40 def on_cancel(self): 41 42 # In order to synchronize handling the cancel operation with the timer, 43 # we will push a helper function onto our RoutineTaskQueue. 44 # This ensures no race-conditions take place between the timer and 45 # the cancel request. 46 self.tasks.push(self._on_cancel) 47 48 # This handles the timer expiry. 49 def on_timer(self, result): 50 if result == beam.threading.Timer.Result.EXPIRED: 51 52 # This branch implies that our timer expired normally. 53 print('hello') 54 self.counter += 1 55 if self.counter >= self.iterations: 56 57 # There is nothing more to print, so transition to 58 # a terminal state, which by default is the COMPLETE state. 59 self.set_terminal() 60 else: 61 62 # There are still further iterations, restart the timer. 63 self.timer.start() 64 else: 65 66 # This branch implies that we canceled the timer in response 67 # to a cancel request. 68 # In this case we set our state to CANCELED. 69 self.set_terminal(beam.tasks.Task.State.CANCELED) 70 71 def _on_cancel(self): 72 if self.counter < self.iterations: 73 74 # Only cancel the timer if there are iterations remaining. 75 self.timer.cancel() 76 77 class HelloTaskFactory(beam.tasks.TaskFactory): 78 '''Builds HelloTasks.''' 79 80 # Typically the parameters that get passed into the Task are 81 # defined as static constant strings in the TaskFactory. 82 # This makes it easier to identify the properties of a task 83 # as well as reference them (avoiding potential typos). 84 ITERATIONS = 'iterations' 85 86 def __init__(self, timer_factory): 87 '''Constructs a HelloTaskFactory. 88 :param timer_factory: Used to construct Timers. 89 ''' 90 91 # Factories will typically inherit from TaskFactory as a base class. 92 beam.tasks.TaskFactory.__init__(self) 93 self.timer_factory = timer_factory 94 95 # The constructor should define all the properties and default 96 # values for those properties. 97 self.define_property(HelloTaskFactory.ITERATIONS, 10) 98 99 # This variable keeps track of continuations. 100 self.continuation_task = None 101 102 # This overrides the TaskFactory create method. 103 def create(self): 104 105 if self.continuation_task is None: 106 107 # We are not creating a continuation task, so all we need to do is 108 # construct a HelloTask using the ITERATIONS property defined above. 109 return HelloTask(self.timer_factory(datetime.timedelta(seconds = 1)), 110 self.get(HelloTaskFactory.ITERATIONS)) 111 else: 112 113 # We are creating a continuation task. The continuation of a HelloTask of 114 # N iterations that has already printed C times is basically a HelloTask 115 # that prints N - C times. 116 continuation = HelloTask( 117 self.timer_factory(datetime.timedelta(seconds = 1)), 118 self.get(HelloTaskFactory.ITERATIONS) - self.continuation_task.counter) 119 self.continuation_task = None 120 return continuation 121 122 # This overrides the TaskFactory prepare_continuation method. 123 def prepare_continuation(self, task): 124 125 # We store the task to continue. 126 self.continuation_task = task 127 128 def main(): 129 130 # Construct the HelloTaskFactory using the LiveTimer for 5 iterations. 131 factory = HelloTaskFactory(beam.threading.LiveTimer) 132 factory.set(HelloTaskFactory.ITERATIONS, 5) 133 task = factory.create() 134 task.execute() 135 136 # Let's sleep for 2 seconds before canceling the task. 137 # It should print hello twice. 138 time.sleep(2) 139 task.cancel() 140 141 # Wait for the task to enter the CANCELED state. 142 beam.tasks.wait(task) 143 144 # Build the continuation task and execute it. To do this we first 145 # call prepare_continuation, then we make our modifications using the set 146 # method, then we create the task and execute it. 147 factory.prepare_continuation(task) 148 factory.set(HelloTaskFactory.ITERATIONS, 7) 149 task = factory.create() 150 151 # We expect it to print hello three times before coming to an end. 152 task.execute() 153 beam.tasks.wait(task) 154 155 if __name__ == '__main__': 156 main()
The above example works, but upon reflection we should notice that this Task is responsible for two distinct things. One is the responsibility of printing, and the other is the responsibility of repeating. Given that the purpose of tasks is to be able to compose asynchronous operations we should separate these two responsibilities from one another. To do that we will change our HelloTask so that all it does is print hello after a specified time period, followed by a RepetitionTask which repeats a task a specified number of times. The benefit of this is that our RepetitionTask can be reused to repeat any task whatsoever down the road.
We will define it as follows:
1 import copy 2 import datetime 3 import time 4 5 import beam 6 7 class RepetitionTask(beam.tasks.BasicTask): 8 '''Repeats a Task a specified number of times.''' 9 10 def __init__(self, task_factory, iterations): 11 '''Constructs the Task. 12 :param task_factory: Builds the Task to repeat. 13 :param iterations: The number of times to repeat the Task. 14 ''' 15 beam.tasks.BasicTask.__init__(self) 16 17 # We should always make a deep copy of factories in order to 18 # avoid modifying a factory belonging to another task or having 19 # another task modify our factory. 20 self.task_factory = copy.deepcopy(task_factory) 21 self.iterations = iterations 22 self.counter = 0 23 24 # This stores the task currently being executed. 25 self.task = None 26 27 # This is used to handle callbacks from our tasks. 28 self.tasks = beam.RoutineTaskQueue() 29 30 def on_execute(self): 31 32 # Defer to a helper function. 33 self.execute_task() 34 35 def on_cancel(self): 36 37 # As before, to avoid race conditions between cancels and 38 # our task we will push a callback onto a RoutineTaskQueue 39 # to handle cancellations. 40 self.tasks.push(self._on_cancel) 41 42 def on_state(self, state_entry): 43 44 # This method handles transitions of the task we're repeating. 45 if state_entry.state == beam.tasks.Task.State.CANCELED: 46 47 # This branch indicates that we canceled our task which 48 # means that we're handling a cancel request. 49 self.set_terminal(beam.tasks.Task.State.CANCELED) 50 elif beam.tasks.is_terminal(state_entry.state): 51 52 # This branch indicates that our task terminated and 53 # hence we should repeat. 54 self.execute_task() 55 56 def _on_cancel(self): 57 if self.counter < self.iterations: 58 59 # Similar to before, only cancel if we still have 60 # repetitions to process. 61 self.task.cancel() 62 63 def execute_task(self): 64 if self.counter >= self.iterations: 65 66 # This branch indicates there are no more iterations left. 67 self.set_terminal() 68 else: 69 70 # This branch indicates that we need to repeat the task 71 # by constructing a new one and executing it. 72 self.counter += 1 73 self.task = self.task_factory.create() 74 self.task.get_publisher().monitor(self.tasks.get_slot(self.on_state)) 75 self.task.execute() 76 77 # This class is very similar to the HelloTaskFactory. 78 class RepetitionTaskFactory(beam.tasks.TaskFactory): 79 ITERATIONS = 'iterations' 80 81 def __init__(self, task_factory): 82 beam.tasks.TaskFactory.__init__(self) 83 self.task_factory = copy.deepcopy(task_factory) 84 self.define_property(RepetitionTaskFactory.ITERATIONS, 10) 85 self.continuation_task = None 86 87 def create(self): 88 if self.continuation_task is None: 89 return RepetitionTask(self.task_factory, 90 self.get(RepetitionTaskFactory.ITERATIONS)) 91 else: 92 continuation = RepetitionTask(self.task_factory, 93 self.get(RepetitionTaskFactory.ITERATIONS) - 94 self.continuation_task.counter) 95 self.continuation_task = None 96 return continuation 97 98 def prepare_continuation(self, task): 99 self.continuation_task = task 100 101 102 # Now that we have factored out the job of repeating tasks we can rewrite our 103 # HelloTask as follows: 104 105 class HelloTask(beam.tasks.BasicTask): 106 def __init__(self, timer): 107 beam.tasks.BasicTask.__init__(self) 108 self.timer = timer 109 self.tasks = beam.RoutineTaskQueue() 110 111 def on_execute(self): 112 self.timer.get_publisher().monitor(self.tasks.get_slot(self.on_timer)) 113 self.timer.start() 114 self.set_active() 115 116 def on_cancel(self): 117 self.tasks.push(self._on_cancel) 118 119 def on_timer(self, result): 120 print('hello') 121 if result == beam.threading.Timer.Result.EXPIRED: 122 self.set_terminal() 123 else: 124 self.set_terminal(beam.tasks.Task.State.CANCELED) 125 126 def _on_cancel(self): 127 self.timer.cancel() 128 129 class HelloTaskFactory(beam.tasks.TaskFactory): 130 def __init__(self, timer_factory): 131 beam.tasks.TaskFactory.__init__(self) 132 self.timer_factory = timer_factory 133 134 def create(self): 135 return HelloTask(self.timer_factory(datetime.timedelta(seconds = 1))) 136 137 # Finally once these two pieces are in place we can combine them as follows: 138 139 def main(): 140 141 # First build the HelloTaskFactory 142 hello_factory = HelloTaskFactory(beam.threading.LiveTimer) 143 144 # Pass the above factory into the RepetitionTaskFactory. 145 # The result is a factory that composes a RepetitionTask with 146 # a HelloTask. 147 factory = RepetitionTaskFactory(hello_factory) 148 factory.set(RepetitionTaskFactory.ITERATIONS, 5) 149 150 # Now we can use the factory similarly to how we used it before. 151 task = factory.create() 152 task.execute() 153 time.sleep(2) 154 task.cancel() 155 beam.tasks.wait(task) 156 factory.prepare_continuation(task) 157 factory.set(RepetitionTaskFactory.ITERATIONS, 7) 158 task = factory.create() 159 task.execute() 160 beam.tasks.wait(task) 161 162 if __name__ == '__main__': 163 main()
In actuality, we can take this decomposition one step further by factoring out from the HelloTask the responsibility of printing 'hello' with the responsibility of running a task after a specified period of time. If we so desired we would write a TimerTask/TimerTaskFactory and then our final composition would be along the lines of a RepetitionTaskFactory(TimerTaskFactory(HelloTaskFactory(), beam.threading.LiveTimer)).