Tasks
Tasks provide a way of encapsulating and managing asynchronous actions in a composable manner. They are primarily used to perform some sort of I/O operation and/or manage a series of sub-tasks. Beam provides several classes to define your own tasks as well as classes that connect tasks to one another.
The task API in Beam is defined by two base classes, the class Beam::Tasks::Task which represents a task, and the class Beam::Tasks::TaskFactory which constructs tasks. In this article we will explore how we can use these two classes in ways that allow us to start, stop, modify and resume asynchronous actions.
Contents
Interface
Task
The Task class defines two methods, an Execute() method to start the task and a Cancel() method to request that the task stop. Both operations are asynchronous, meaning that they return immediately and the actual operation of the task takes place in a separate thread of execution. In order to keep track of the progress of a task one must monitor its state which can be any of the following:
State | Description |
---|---|
NONE | The task has not yet been executed. |
INITIALIZING | The task is performing some initialization, during this state no sub-tasks may be executed. |
ACTIVE | The task is running, during this state sub-tasks may be executed. |
PENDING_CANCEL | A request to cancel the task has been made, no new sub-tasks may be executed. |
CANCELED | The task was canceled. |
FAILED | The task could not complete due to an error. |
EXPIRED | The task could not complete due to a time constraint. |
COMPLETE | The task completed successfully. |
The states CANCELED, FAILED, EXPIRED and COMPLETE all represent terminal states. Before a task can transition to a terminal state, all of its sub tasks must be in a terminal state. Furthermore once a task is in a terminal state the task is not permitted to perform any additional action or transition to a different state.
The current state of the task along with all of its transitions is accessed through its Publisher by calling the GetPublisher() method. The publisher will emit objects of type Beam::Tasks::Task::StateEntry which contains two fields as follows:
Name | Type | Description |
---|---|---|
m_state | Beam::Tasks::Task::State | The state of the task. |
m_message | std::string | A message describing the reason for the transition. |
In effect a task begins in the NONE state, is then started via the Execute() method, transitions into the INITIALIZATION state where it performs any setup needed, then proceeds to the ACTIVE state where it performs its operation including executing any required sub-tasks, and then finally terminates all of its sub-tasks and performs any final clean-up before ending in a terminal state. During any point after the INITIALIZATION state it may encounter a cancel request (via the Cancel() method) or an error, either of which should result in cleaning-up and terminating its sub-tasks before transitioning to the FAILED or CANCELED state.
Task Factory
The task factory is responsible for creating new tasks by invoking its Create() method. In addition to this a task factory also keeps track of the parameters and state needed to create tasks. Setting a task's parameter is done by calling the factory's Set(name, value) method and retrieving a parameter is done via the Get(name) method.
Continuations
One additional operation that task factories perform is constructing what is called a continuation task. Continuations are a mechanism that allow us to resume a task that previously terminated, but more than that they also allow us to modify the parameters of that task. In a sense, a continuation lets us take a terminated task, modify it, and then continue running it with those modifications.
To do this, task factories have a method PrepareContinuation(task). To use it you pass into it a task that has terminated, then you modify its properties using the Set(name, value) method, and then finally you invoke the Create() method to get a newly constructed task that represents the continuation of the old task.
Not all tasks support continuations, those that do not will throw a Beam::NotSupportedException.
BasicTask
As a great deal of care must be taken to ensure that tasks properly transition from State to State, including managing sub-tasks, handling cancel requests etc... Beam provides the Beam::Tasks::BasicTask as a base class which can be inherited from to take care of much of the work needed to write a proper task. For example it ensures that upon a cancel request that the task transitions into the PENDING_CANCEL state and terminates all managed sub-tasks.
To make use of a the BasicTask you need only implement the OnExecute() method to start your task, and the OnCancel() method to handle cancelling your class.
Example
To showcase a simple example, let's build a task that prints "hello" every second a given number of times. We will write a HelloTask class which inherits from BasicTask, and a HelloTaskFactory.
First Attempt
1 import datetime
2 import time
3
4 import beam
5
6 class HelloTask(beam.tasks.BasicTask):
7 '''Prints hello a specified number of times.'''
8
9 def __init__(self, timer, iterations):
10 ''' Constructs this task.
11 :param timer: The Timer used to wait between successive prints.
12 :param iterations: The number of times to print.
13 '''
14
15 # We must make sure to initialize the base class.
16 beam.tasks.BasicTask.__init__(self)
17 self.timer = timer
18 self.iterations = iterations
19
20 # The number of times we've printed so far.
21 self.counter = 0
22
23 # Used to handle timer callbacks.
24 self.tasks = beam.RoutineTaskQueue()
25
26 # This overrides BasicTask.on_execute.
27 # When called we can assume that our Task is in the INITIALIZATION state.
28 def on_execute(self):
29
30 # To initialize our task we will monitor our timer and then start the timer.
31 self.timer.get_publisher().monitor(self.tasks.get_slot(self.on_timer))
32 self.timer.start()
33
34 # Once the initialization is complete we transition to the ACTIVE state.
35 self.set_active()
36
37 # This overrides BasicTask.on_cancel.
38 # When called we can assume that our Task is in the PENDING_CANCEL state.
39 # No new sub-tasks may be executed.
40 def on_cancel(self):
41
42 # In order to synchronize handling the cancel operation with the timer,
43 # we will push a helper function onto our RoutineTaskQueue.
44 # This ensures no race-conditions take place between the timer and
45 # the cancel request.
46 self.tasks.push(self._on_cancel)
47
48 # This handles the timer expiry.
49 def on_timer(self, result):
50 if result == beam.threading.Timer.Result.EXPIRED:
51
52 # This branch implies that our timer expired normally.
53 print('hello')
54 self.counter += 1
55 if self.counter >= self.iterations:
56
57 # There is nothing more to print, so transition to
58 # a terminal state, which by default is the COMPLETE state.
59 self.set_terminal()
60 else:
61
62 # There are still further iterations, restart the timer.
63 self.timer.start()
64 else:
65
66 # This branch implies that we canceled the timer in response
67 # to a cancel request.
68 # In this case we set our state to CANCELED.
69 self.set_terminal(beam.tasks.Task.State.CANCELED)
70
71 def _on_cancel(self):
72 if self.counter < self.iterations:
73
74 # Only cancel the timer if there are iterations remaining.
75 self.timer.cancel()
76
77 class HelloTaskFactory(beam.tasks.TaskFactory):
78 '''Builds HelloTasks.'''
79
80 # Typically the parameters that get passed into the Task are
81 # defined as static constant strings in the TaskFactory.
82 # This makes it easier to identify the properties of a task
83 # as well as reference them (avoiding potential typos).
84 ITERATIONS = 'iterations'
85
86 def __init__(self, timer_factory):
87 '''Constructs a HelloTaskFactory.
88 :param timer_factory: Used to construct Timers.
89 '''
90
91 # Factories will typically inherit from TaskFactory as a base class.
92 beam.tasks.TaskFactory.__init__(self)
93 self.timer_factory = timer_factory
94
95 # The constructor should define all the properties and default
96 # values for those properties.
97 self.define_property(HelloTaskFactory.ITERATIONS, 10)
98
99 # This variable keeps track of continuations.
100 self.continuation_task = None
101
102 # This overrides the TaskFactory create method.
103 def create(self):
104
105 if self.continuation_task is None:
106
107 # We are not creating a continuation task, so all we need to do is
108 # construct a HelloTask using the ITERATIONS property defined above.
109 return HelloTask(self.timer_factory(datetime.timedelta(seconds = 1)),
110 self.get(HelloTaskFactory.ITERATIONS))
111 else:
112
113 # We are creating a continuation task. The continuation of a HelloTask of
114 # N iterations that has already printed C times is basically a HelloTask
115 # that prints N - C times.
116 continuation = HelloTask(
117 self.timer_factory(datetime.timedelta(seconds = 1)),
118 self.get(HelloTaskFactory.ITERATIONS) - self.continuation_task.counter)
119 self.continuation_task = None
120 return continuation
121
122 # This overrides the TaskFactory prepare_continuation method.
123 def prepare_continuation(self, task):
124
125 # We store the task to continue.
126 self.continuation_task = task
127
128 def main():
129
130 # Construct the HelloTaskFactory using the LiveTimer for 5 iterations.
131 factory = HelloTaskFactory(beam.threading.LiveTimer)
132 factory.set(HelloTaskFactory.ITERATIONS, 5)
133 task = factory.create()
134 task.execute()
135
136 # Let's sleep for 2 seconds before canceling the task.
137 # It should print hello twice.
138 time.sleep(2)
139 task.cancel()
140
141 # Wait for the task to enter the CANCELED state.
142 beam.tasks.wait(task)
143
144 # Build the continuation task and execute it. To do this we first
145 # call prepare_continuation, then we make our modifications using the set
146 # method, then we create the task and execute it.
147 factory.prepare_continuation(task)
148 factory.set(HelloTaskFactory.ITERATIONS, 7)
149 task = factory.create()
150
151 # We expect it to print hello three times before coming to an end.
152 task.execute()
153 beam.tasks.wait(task)
154
155 if __name__ == '__main__':
156 main()
Decomposition
The above example works, but upon reflection we should notice that this Task is responsible for two distinct things. One is the responsibility of printing, and the other is the responsibility of repeating. Given that the purpose of tasks is to be able to compose asynchronous operations we should separate these two responsibilities from one another. To do that we will change our HelloTask so that all it does is print hello after a specified time period, followed by a RepetitionTask which repeats a task a specified number of times. The benefit of this is that our RepetitionTask can be reused to repeat any task whatsoever down the road.
We will define it as follows:
1 import copy
2 import datetime
3 import time
4
5 import beam
6
7 class RepetitionTask(beam.tasks.BasicTask):
8 '''Repeats a Task a specified number of times.'''
9
10 def __init__(self, task_factory, iterations):
11 '''Constructs the Task.
12 :param task_factory: Builds the Task to repeat.
13 :param iterations: The number of times to repeat the Task.
14 '''
15 beam.tasks.BasicTask.__init__(self)
16
17 # We should always make a deep copy of factories in order to
18 # avoid modifying a factory belonging to another task or having
19 # another task modify our factory.
20 self.task_factory = copy.deepcopy(task_factory)
21 self.iterations = iterations
22 self.counter = 0
23
24 # This stores the task currently being executed.
25 self.task = None
26
27 # This is used to handle callbacks from our tasks.
28 self.tasks = beam.RoutineTaskQueue()
29
30 def on_execute(self):
31
32 # Defer to a helper function.
33 self.execute_task()
34
35 def on_cancel(self):
36
37 # As before, to avoid race conditions between cancels and
38 # our task we will push a callback onto a RoutineTaskQueue
39 # to handle cancellations.
40 self.tasks.push(self._on_cancel)
41
42 def on_state(self, state_entry):
43
44 # This method handles transitions of the task we're repeating.
45 if state_entry.state == beam.tasks.Task.State.CANCELED:
46
47 # This branch indicates that we canceled our task which
48 # means that we're handling a cancel request.
49 self.set_terminal(beam.tasks.Task.State.CANCELED)
50 elif beam.tasks.is_terminal(state_entry.state):
51
52 # This branch indicates that our task terminated and
53 # hence we should repeat.
54 self.execute_task()
55
56 def _on_cancel(self):
57 if self.counter < self.iterations:
58
59 # Similar to before, only cancel if we still have
60 # repetitions to process.
61 self.task.cancel()
62
63 def execute_task(self):
64 if self.counter >= self.iterations:
65
66 # This branch indicates there are no more iterations left.
67 self.set_terminal()
68 else:
69
70 # This branch indicates that we need to repeat the task
71 # by constructing a new one and executing it.
72 self.counter += 1
73 self.task = self.task_factory.create()
74 self.task.get_publisher().monitor(self.tasks.get_slot(self.on_state))
75 self.task.execute()
76
77 # This class is very similar to the HelloTaskFactory.
78 class RepetitionTaskFactory(beam.tasks.TaskFactory):
79 ITERATIONS = 'iterations'
80
81 def __init__(self, task_factory):
82 beam.tasks.TaskFactory.__init__(self)
83 self.task_factory = copy.deepcopy(task_factory)
84 self.define_property(RepetitionTaskFactory.ITERATIONS, 10)
85 self.continuation_task = None
86
87 def create(self):
88 if self.continuation_task is None:
89 return RepetitionTask(self.task_factory,
90 self.get(RepetitionTaskFactory.ITERATIONS))
91 else:
92 continuation = RepetitionTask(self.task_factory,
93 self.get(RepetitionTaskFactory.ITERATIONS) -
94 self.continuation_task.counter)
95 self.continuation_task = None
96 return continuation
97
98 def prepare_continuation(self, task):
99 self.continuation_task = task
100
101
102 # Now that we have factored out the job of repeating tasks we can rewrite our
103 # HelloTask as follows:
104
105 class HelloTask(beam.tasks.BasicTask):
106 def __init__(self, timer):
107 beam.tasks.BasicTask.__init__(self)
108 self.timer = timer
109 self.tasks = beam.RoutineTaskQueue()
110
111 def on_execute(self):
112 self.timer.get_publisher().monitor(self.tasks.get_slot(self.on_timer))
113 self.timer.start()
114 self.set_active()
115
116 def on_cancel(self):
117 self.tasks.push(self._on_cancel)
118
119 def on_timer(self, result):
120 print('hello')
121 if result == beam.threading.Timer.Result.EXPIRED:
122 self.set_terminal()
123 else:
124 self.set_terminal(beam.tasks.Task.State.CANCELED)
125
126 def _on_cancel(self):
127 self.timer.cancel()
128
129 class HelloTaskFactory(beam.tasks.TaskFactory):
130 def __init__(self, timer_factory):
131 beam.tasks.TaskFactory.__init__(self)
132 self.timer_factory = timer_factory
133
134 def create(self):
135 return HelloTask(self.timer_factory(datetime.timedelta(seconds = 1)))
136
137 # Finally once these two pieces are in place we can combine them as follows:
138
139 def main():
140
141 # First build the HelloTaskFactory
142 hello_factory = HelloTaskFactory(beam.threading.LiveTimer)
143
144 # Pass the above factory into the RepetitionTaskFactory.
145 # The result is a factory that composes a RepetitionTask with
146 # a HelloTask.
147 factory = RepetitionTaskFactory(hello_factory)
148 factory.set(RepetitionTaskFactory.ITERATIONS, 5)
149
150 # Now we can use the factory similarly to how we used it before.
151 task = factory.create()
152 task.execute()
153 time.sleep(2)
154 task.cancel()
155 beam.tasks.wait(task)
156 factory.prepare_continuation(task)
157 factory.set(RepetitionTaskFactory.ITERATIONS, 7)
158 task = factory.create()
159 task.execute()
160 beam.tasks.wait(task)
161
162 if __name__ == '__main__':
163 main()
In actuality, we can take this decomposition one step further by factoring out from the HelloTask the responsibility of printing 'hello' with the responsibility of running a task after a specified period of time. If we so desired we would write a TimerTask/TimerTaskFactory and then our final composition would be along the lines of a RepetitionTaskFactory(TimerTaskFactory(HelloTaskFactory(), beam.threading.LiveTimer)).