Tasks
Tasks provide a way of encapsulating and managing asynchronous actions in a composable manner. They are primarily used to perform some sort of I/O operation and/or manage a series of sub-tasks. Beam provides several classes to define your own tasks as well as classes that connect tasks to one another.
The task API in Beam is defined by two base classes, the class Beam::Tasks::Task which represents a task, and the class Beam::Tasks::TaskFactory which constructs tasks. In this article we will explore how we can use these two classes in ways that allow us to start, stop, modify and resume asynchronous actions.
Contents
Interface
Task
The Task class defines two methods, an Execute() method to start the task and a Cancel() method to request that the task stop. Both operations are asynchronous, meaning that they return immediately and the actual operation of the task takes place in a separate thread of execution. In order to keep track of the progress of a task one must monitor its state which can be any of the following:
State | Description |
---|---|
NONE | The task has not yet been executed. |
INITIALIZING | The task is performing some initialization, during this state no sub-tasks may be executed. |
ACTIVE | The task is running, during this state sub-tasks may be executed. |
PENDING_CANCEL | A request to cancel the task has been made, no new sub-tasks may be executed. |
CANCELED | The task was canceled. |
FAILED | The task could not complete due to an error. |
EXPIRED | The task could not complete due to a time constraint. |
COMPLETE | The task completed successfully. |
The states CANCELED, FAILED, EXPIRED and COMPLETE all represent terminal states. Before a task can transition to a terminal state, all of its sub tasks must be in a terminal state. Furthermore once a task is in a terminal state the task is not permitted to perform any additional action or transition to a different state.
The current state of the task along with all of its transitions is accessed through its Publisher by calling the GetPublisher() method. The publisher will emit objects of type Beam::Tasks::Task::StateEntry which contains two fields as follows:
Name | Type | Description |
---|---|---|
m_state | Beam::Tasks::Task::State | The state of the task. |
m_message | std::string | A message describing the reason for the transition. |
In effect a task begins in the NONE state, is then started via the Execute() method, transitions into the INITIALIZATION state where it performs any setup needed, then proceeds to the ACTIVE state where it performs its operation including executing any required sub-tasks, and then finally terminates all of its sub-tasks and performs any final clean-up before ending in a terminal state. During any point after the INITIALIZATION state it may encounter a cancel request (via the Cancel() method) or an error, either of which should result in cleaning-up and terminating its sub-tasks before transitioning to the FAILED or CANCELED state.
Task Factory
The task factory is responsible for creating new tasks by invoking its Create() method. In addition to this a task factory also keeps track of the parameters and state needed to create tasks. Setting a task's parameter is done by calling the factory's Set(name, value) method and retrieving a parameter is done via the Get(name) method.
Continuations
One additional operation that task factories perform is constructing what is called a continuation task. Continuations are a mechanism that allow us to resume a task that previously terminated, but more than that they also allow us to modify the parameters of that task. In a sense, a continuation lets us take a terminated task, modify it, and then continue running it with those modifications.
To do this, task factories have a method PrepareContinuation(task). To use it you pass into it a task that has terminated, then you modify its properties using the Set(name, value) method, and then finally you invoke the Create() method to get a newly constructed task that represents the continuation of the old task.
Not all tasks support continuations, those that do not will throw a Beam::NotSupportedException.
BasicTask
As a great deal of care must be taken to ensure that tasks properly transition from State to State, including managing sub-tasks, handling cancel requests etc... Beam provides the Beam::Tasks::BasicTask as a base class which can be inherited from to take care of much of the work needed to write a proper task. For example it ensures that upon a cancel request that the task transitions into the PENDING_CANCEL state and terminates all managed sub-tasks.
To make use of a the BasicTask you need only implement the OnExecute() method to start your task, and the OnCancel() method to handle cancelling your class.
Example
To showcase a simple example, let's build a task that prints "hello" every second a given number of times. We will write a HelloTask class which inherits from BasicTask, and a HelloTaskFactory.
First Attempt
1 import beam
2 import datetime
3 import time
4
5 class HelloTask(beam.tasks.BasicTask):
6 '''Prints hello a specified number of times.'''
7
8 def __init__(self, timer, iterations):
9 ''' Constructs this task.
10 :param timer: The Timer used to wait between successive prints.
11 :param iterations: The number of times to print.
12 '''
13
14 # We must make sure to initialize the base class.
15 beam.tasks.BasicTask.__init__(self)
16 self.timer = timer
17 self.iterations = iterations
18
19 # The number of times we've printed so far.
20 self.counter = 0
21
22 # Used to handle timer callbacks.
23 self.tasks = beam.RoutineTaskQueue()
24
25 # This overrides BasicTask.on_execute.
26 # When called we can assume that our Task is in the INITIALIZATION state.
27 def on_execute(self):
28
29 # To initialize our task we will monitor our timer and then start the timer.
30 self.timer.get_publisher().monitor(self.tasks.get_slot(self.on_timer))
31 self.timer.start()
32
33 # Once the initialization is complete we transition to the ACTIVE state.
34 self.set_active()
35
36 # This overrides BasicTask.on_cancel.
37 # When called we can assume that our Task is in the PENDING_CANCEL state.
38 # No new sub-tasks may be executed.
39 def on_cancel(self):
40
41 # In order to synchronize handling the cancel operation with the timer,
42 # we will push a helper function onto our RoutineTaskQueue.
43 # This ensures no race-conditions take place between the timer and
44 # the cancel request.
45 self.tasks.push(self._on_cancel)
46
47 # This handles the timer expiry.
48 def on_timer(self, result):
49 if result == beam.threading.Timer.Result.EXPIRED:
50
51 # This branch implies that our timer expired normally.
52 print('hello')
53 self.counter += 1
54 if self.counter >= self.iterations:
55
56 # There is nothing more to print, so transition to
57 # a terminal state, which by default is the COMPLETE state.
58 self.set_terminal()
59 else:
60
61 # There are still further iterations, restart the timer.
62 self.timer.start()
63 else:
64
65 # This branch implies that we canceled the timer in response
66 # to a cancel request.
67 # In this case we set our state to CANCELED.
68 self.set_terminal(beam.tasks.Task.State.CANCELED)
69
70 def _on_cancel(self):
71 if self.counter < self.iterations:
72
73 # Only cancel the timer if there are iterations remaining.
74 self.timer.cancel()
75
76 class HelloTaskFactory(beam.tasks.TaskFactory):
77 '''Builds HelloTasks.'''
78
79 # Typically the parameters that get passed into the Task are
80 # defined as static constant strings in the TaskFactory.
81 # This makes it easier to identify the properties of a task
82 # as well as reference them (avoiding potential typos).
83 ITERATIONS = 'iterations'
84
85 def __init__(self, timer_factory):
86 '''Constructs a HelloTaskFactory.
87 :param timer_factory: Used to construct Timers.
88 '''
89
90 # Factories will typically inherit from TaskFactory as a base class.
91 beam.tasks.TaskFactory.__init__(self)
92 self.timer_factory = timer_factory
93
94 # The constructor should define all the properties and default
95 # values for those properties.
96 self.define_property(HelloTaskFactory.ITERATIONS, 10)
97
98 # This variable keeps track of continuations.
99 self.continuation_task = None
100
101 # This overrides the TaskFactory create method.
102 def create(self):
103
104 if self.continuation_task is None:
105
106 # We are not creating a continuation task, so all we need to do is
107 # construct a HelloTask using the ITERATIONS property defined above.
108 return HelloTask(self.timer_factory(datetime.timedelta(seconds = 1)),
109 self.get(HelloTaskFactory.ITERATIONS))
110 else:
111
112 # We are creating a continuation task. The continuation of a HelloTask of
113 # N iterations that has already printed C times is basically a HelloTask
114 # that prints N - C times.
115 continuation = HelloTask(
116 self.timer_factory(datetime.timedelta(seconds = 1)),
117 self.get(HelloTaskFactory.ITERATIONS) - self.continuation_task.counter)
118 self.continuation_task = None
119 return continuation
120
121 # This overrides the TaskFactory prepare_continuation method.
122 def prepare_continuation(self, task):
123
124 # We store the task to continue.
125 self.continuation_task = task
126
127 def main():
128
129 # Construct the HelloTaskFactory using the LiveTimer for 5 iterations.
130 factory = HelloTaskFactory(beam.threading.LiveTimer)
131 factory.set(HelloTaskFactory.ITERATIONS, 5)
132 task = factory.create()
133 task.execute()
134
135 # Let's sleep for 2 seconds before canceling the task.
136 # It should print hello twice.
137 time.sleep(2)
138 task.cancel()
139
140 # Wait for the task to enter the CANCELED state.
141 beam.tasks.wait(task)
142
143 # Build the continuation task and execute it. To do this we first
144 # call prepare_continuation, then we make our modifications using the set
145 # method, then we create the task and execute it.
146 factory.prepare_continuation(task)
147 factory.set(HelloTaskFactory.ITERATIONS, 7)
148 task = factory.create()
149
150 # We expect it to print hello three times before coming to an end.
151 task.execute()
152 beam.tasks.wait(task)
153
154 if __name__ == '__main__':
155 main()
Decomposition
The above example works, but upon reflection we should notice that this Task is responsible for two distinct things. One is the responsibility of printing, and the other is the responsibility of repeating. Given that the purpose of tasks is to be able to compose asynchronous operations we should separate these two responsibilities from one another. To do that we will change our HelloTask so that all it does is print hello after a specified time period, followed by a RepetitionTask which repeats a task a specified number of times. The benefit of this is that our RepetitionTask can be reused to repeat any task whatsoever down the road.
We will define it as follows:
1 class RepetitionTask(beam.tasks.BasicTask):
2 '''Repeats a Task a specified number of times.'''
3
4 def __init__(self, task_factory, iterations):
5 '''Constructs the Task.
6 :param task_factory: Builds the Task to repeat.
7 :param iterations: The number of times to repeat the Task.
8 '''
9 beam.tasks.BasicTask.__init__(self)
10
11 # We should always make a deep copy of factories in order to
12 # avoid modifying a factory belonging to another task or having
13 # another task modify our factory.
14 self.task_factory = copy.deepcopy(task_factory)
15 self.iterations = iterations
16 self.counter = 0
17
18 # This stores the task currently being executed.
19 self.task = None
20
21 # This is used to handle callbacks from our tasks.
22 self.tasks = beam.RoutineTaskQueue()
23
24 def on_execute(self):
25
26 # Defer to a helper function.
27 self.execute_task()
28
29 def on_cancel(self):
30
31 # As before, to avoid race conditions between cancels and
32 # our task we will push a callback onto a RoutineTaskQueue
33 # to handle cancellations.
34 self.tasks.push(self._on_cancel)
35
36 def on_state(self, state_entry):
37
38 # This method handles transitions of the task we're repeating.
39 if state_entry.state == beam.tasks.Task.State.CANCELED:
40
41 # This branch indicates that we canceled our task which
42 # means that we're handling a cancel request.
43 self.set_terminal(beam.tasks.Task.State.CANCELED)
44 elif beam.tasks.is_terminal(state_entry.state):
45
46 # This branch indicates that our task terminated and
47 # hence we should repeat.
48 self.execute_task()
49
50 def _on_cancel(self):
51 if self.counter < self.iterations:
52
53 # Similar to before, only cancel if we still have
54 # repetitions to process.
55 self.task.cancel()
56
57 def execute_task(self):
58 if self.counter >= self.iterations:
59
60 # This branch indicates there are no more iterations left.
61 self.set_terminal()
62 else:
63
64 # This branch indicates that we need to repeat the task
65 # by constructing a new one and executing it.
66 self.counter += 1
67 self.task = self.task_factory.create()
68 self.task.get_publisher().monitor(self.tasks.get_slot(self.on_state))
69 self.task.execute()
70
71 # This class is very similar to the HelloTaskFactory.
72 class RepetitionTaskFactory(beam.tasks.TaskFactory):
73 ITERATIONS = 'iterations'
74
75 def __init__(self, task_factory):
76 beam.tasks.TaskFactory.__init__(self)
77 self.task_factory = copy.deepcopy(task_factory)
78 self.define_property(RepetitionTaskFactory.ITERATIONS, 10)
79 self.continuation_task = None
80
81 def create(self):
82 if self.continuation_task is None:
83 return RepetitionTask(self.task_factory,
84 self.get(RepetitionTaskFactory.ITERATIONS))
85 else:
86 continuation = RepetitionTask(self.task_factory,
87 self.get(RepetitionTaskFactory.ITERATIONS) -
88 self.continuation_task.counter)
89 self.continuation_task = None
90 return continuation
91
92 def prepare_continuation(self, task):
93 self.continuation_task = task
94
95
96 Now that we have factored out the job of repeating tasks we can rewrite our HelloTask as follows:
97
98 class HelloTask(beam.tasks.BasicTask):
99 def __init__(self, timer):
100 beam.tasks.BasicTask.__init__(self)
101 self.timer = timer
102 self.tasks = beam.RoutineTaskQueue()
103
104 def on_execute(self):
105 self.timer.get_publisher().monitor(self.tasks.get_slot(self.on_timer))
106 self.timer.start()
107 self.set_active()
108
109 def on_cancel(self):
110 self.tasks.push(self._on_cancel)
111
112 def on_timer(self, result):
113 print 'hello'
114 if result == beam.threading.Timer.Result.EXPIRED:
115 self.set_terminal()
116 else:
117 self.set_terminal(beam.tasks.Task.State.CANCELED)
118
119 def _on_cancel(self):
120 self.timer.cancel()
121
122 class HelloTaskFactory(beam.tasks.TaskFactory):
123 def __init__(self, timer_factory):
124 beam.tasks.TaskFactory.__init__(self)
125 self.timer_factory = timer_factory
126
127 def create(self):
128 return HelloTask(self.timer_factory(datetime.timedelta(seconds = 1)))
129
130 Finally once these two pieces are in place we can combine them as follows:
131
132 def main():
133
134 # First build the HelloTaskFactory
135 hello_factory = HelloTaskFactory(beam.threading.LiveTimer)
136
137 # Pass the above factory into the RepetitionTaskFactory.
138 # The result is a factory that composes a RepetitionTask with
139 # a HelloTask.
140 factory = RepetitionTaskFactory(hello_factory)
141 factory.set(RepetitionTaskFactory.ITERATIONS, 5)
142
143 # Now we can use the factory similarly to how we used it before.
144 task = factory.create()
145 task.execute()
146 time.sleep(2)
147 task.cancel()
148 beam.tasks.wait(task)
149 factory.prepare_continuation(task)
150 factory.set(RepetitionTaskFactory.ITERATIONS, 7)
151 task = factory.create()
152 task.execute()
153 beam.tasks.wait(task)
154
155 if __name__ == '__main__':
156 main()
In actuality, we can take this decomposition one step further by factoring out from the HelloTask the responsibility of printing 'hello' with the responsibility of running a task after a specified period of time. If we so desired we would write a TimerTask/TimerTaskFactory and then our final composition would be along the lines of a RepetitionTaskFactory(TimerTaskFactory(HelloTaskFactory(), beam.threading.LiveTimer)).