From Spire Trading Inc.
Jump to: navigation, search

Tasks provide a way of encapsulating and managing asynchronous actions in a composable manner. They are primarily used to perform some sort of I/O operation and/or manage a series of sub-tasks. Beam provides several classes to define your own tasks as well as classes that connect tasks to one another.

The task API in Beam is defined by two base classes, the class Beam::Tasks::Task which represents a task, and the class Beam::Tasks::TaskFactory which constructs tasks. In this article we will explore how we can use these two classes in ways that allow us to start, stop, modify and resume asynchronous actions.



The Task class defines two methods, an Execute() method to start the task and a Cancel() method to request that the task stop. Both operations are asynchronous, meaning that they return immediately and the actual operation of the task takes place in a separate thread of execution. In order to keep track of the progress of a task one must monitor its state which can be any of the following:

State Description
NONE The task has not yet been executed.
INITIALIZING The task is performing some initialization, during this state no sub-tasks may be executed.
ACTIVE The task is running, during this state sub-tasks may be executed.
PENDING_CANCEL A request to cancel the task has been made, no new sub-tasks may be executed.
CANCELED The task was canceled.
FAILED The task could not complete due to an error.
EXPIRED The task could not complete due to a time constraint.
COMPLETE The task completed successfully.

The states CANCELED, FAILED, EXPIRED and COMPLETE all represent terminal states. Before a task can transition to a terminal state, all of its sub tasks must be in a terminal state. Furthermore once a task is in a terminal state the task is not permitted to perform any additional action or transition to a different state.

The current state of the task along with all of its transitions is accessed through its Publisher by calling the GetPublisher() method. The publisher will emit objects of type Beam::Tasks::Task::StateEntry which contains two fields as follows:

Name Type Description
m_state Beam::Tasks::Task::State The state of the task.
m_message std::string A message describing the reason for the transition.

In effect a task begins in the NONE state, is then started via the Execute() method, transitions into the INITIALIZATION state where it performs any setup needed, then proceeds to the ACTIVE state where it performs its operation including executing any required sub-tasks, and then finally terminates all of its sub-tasks and performs any final clean-up before ending in a terminal state. During any point after the INITIALIZATION state it may encounter a cancel request (via the Cancel() method) or an error, either of which should result in cleaning-up and terminating its sub-tasks before transitioning to the FAILED or CANCELED state.

Task Factory

The task factory is responsible for creating new tasks by invoking its Create() method. In addition to this a task factory also keeps track of the parameters and state needed to create tasks. Setting a task's parameter is done by calling the factory's Set(name, value) method and retrieving a parameter is done via the Get(name) method.


One additional operation that task factories perform is constructing what is called a continuation task. Continuations are a mechanism that allow us to resume a task that previously terminated, but more than that they also allow us to modify the parameters of that task. In a sense, a continuation lets us take a terminated task, modify it, and then continue running it with those modifications.

To do this, task factories have a method PrepareContinuation(task). To use it you pass into it a task that has terminated, then you modify its properties using the Set(name, value) method, and then finally you invoke the Create() method to get a newly constructed task that represents the continuation of the old task.

Not all tasks support continuations, those that do not will throw a Beam::NotSupportedException.


As a great deal of care must be taken to ensure that tasks properly transition from State to State, including managing sub-tasks, handling cancel requests etc... Beam provides the Beam::Tasks::BasicTask as a base class which can be inherited from to take care of much of the work needed to write a proper task. For example it ensures that upon a cancel request that the task transitions into the PENDING_CANCEL state and terminates all managed sub-tasks.

To make use of a the BasicTask you need only implement the OnExecute() method to start your task, and the OnCancel() method to handle cancelling your class.


To showcase a simple example, let's build a task that prints "hello" every second a given number of times. We will write a HelloTask class which inherits from BasicTask, and a HelloTaskFactory.

First Attempt

  1 import datetime
  2 import time
  4 import beam
  6 class HelloTask(beam.tasks.BasicTask):
  7   '''Prints hello a specified number of times.'''
  9   def __init__(self, timer, iterations):
 10     ''' Constructs this task.
 11     :param timer: The Timer used to wait between successive prints.
 12     :param iterations: The number of times to print.
 13     '''
 15     # We must make sure to initialize the base class.
 16     beam.tasks.BasicTask.__init__(self)
 17     self.timer = timer
 18     self.iterations = iterations
 20     # The number of times we've printed so far.
 21     self.counter = 0
 23     # Used to handle timer callbacks.
 24     self.tasks = beam.RoutineTaskQueue()
 26   # This overrides BasicTask.on_execute.
 27   # When called we can assume that our Task is in the INITIALIZATION state.
 28   def on_execute(self):
 30     # To initialize our task we will monitor our timer and then start the timer.
 31     self.timer.get_publisher().monitor(self.tasks.get_slot(self.on_timer))
 32     self.timer.start()
 34     # Once the initialization is complete we transition to the ACTIVE state.
 35     self.set_active()
 37   # This overrides BasicTask.on_cancel.
 38   # When called we can assume that our Task is in the PENDING_CANCEL state.
 39   # No new sub-tasks may be executed.
 40   def on_cancel(self):
 42     # In order to synchronize handling the cancel operation with the timer,
 43     # we will push a helper function onto our RoutineTaskQueue.
 44     # This ensures no race-conditions take place between the timer and
 45     # the cancel request.
 46     self.tasks.push(self._on_cancel)
 48   # This handles the timer expiry.
 49   def on_timer(self, result):
 50     if result == beam.threading.Timer.Result.EXPIRED:
 52       # This branch implies that our timer expired normally.
 53       print('hello')
 54       self.counter += 1
 55       if self.counter >= self.iterations:
 57         # There is nothing more to print, so transition to
 58         # a terminal state, which by default is the COMPLETE state.
 59         self.set_terminal()
 60       else:
 62         # There are still further iterations, restart the timer.
 63         self.timer.start()
 64     else:
 66         # This branch implies that we canceled the timer in response
 67         # to a cancel request.
 68         # In this case we set our state to CANCELED.
 69         self.set_terminal(beam.tasks.Task.State.CANCELED)
 71   def _on_cancel(self):
 72     if self.counter < self.iterations:
 74       # Only cancel the timer if there are iterations remaining.
 75       self.timer.cancel()
 77 class HelloTaskFactory(beam.tasks.TaskFactory):
 78   '''Builds HelloTasks.'''
 80   # Typically the parameters that get passed into the Task are
 81   # defined as static constant strings in the TaskFactory.
 82   # This makes it easier to identify the properties of a task
 83   # as well as reference them (avoiding potential typos).
 84   ITERATIONS = 'iterations'
 86   def __init__(self, timer_factory):
 87     '''Constructs a HelloTaskFactory.
 88     :param timer_factory: Used to construct Timers.
 89     '''
 91     # Factories will typically inherit from TaskFactory as a base class.
 92     beam.tasks.TaskFactory.__init__(self)
 93     self.timer_factory = timer_factory
 95     # The constructor should define all the properties and default
 96     # values for those properties.
 97     self.define_property(HelloTaskFactory.ITERATIONS, 10)
 99     # This variable keeps track of continuations.
100     self.continuation_task = None
102   # This overrides the TaskFactory create method.
103   def create(self):
105     if self.continuation_task is None:
107       # We are not creating a continuation task, so all we need to do is
108       # construct a HelloTask using the ITERATIONS property defined above.
109       return HelloTask(self.timer_factory(datetime.timedelta(seconds = 1)),
110         self.get(HelloTaskFactory.ITERATIONS))
111     else:
113       # We are creating a continuation task.  The continuation of a HelloTask of
114       # N iterations that has already printed C times is basically a HelloTask
115       # that prints N - C times.
116       continuation = HelloTask(
117         self.timer_factory(datetime.timedelta(seconds = 1)),
118         self.get(HelloTaskFactory.ITERATIONS) - self.continuation_task.counter)
119       self.continuation_task = None
120       return continuation
122   # This overrides the TaskFactory prepare_continuation method.
123   def prepare_continuation(self, task):
125     # We store the task to continue.
126     self.continuation_task = task
128 def main():
130   # Construct the HelloTaskFactory using the LiveTimer for 5 iterations.
131   factory = HelloTaskFactory(beam.threading.LiveTimer)
132   factory.set(HelloTaskFactory.ITERATIONS, 5)
133   task = factory.create()
134   task.execute()
136   # Let's sleep for 2 seconds before canceling the task.
137   # It should print hello twice.
138   time.sleep(2)
139   task.cancel()
141   # Wait for the task to enter the CANCELED state.
142   beam.tasks.wait(task)
144   # Build the continuation task and execute it.  To do this we first
145   # call prepare_continuation, then we make our modifications using the set
146   # method, then we create the task and execute it.
147   factory.prepare_continuation(task)
148   factory.set(HelloTaskFactory.ITERATIONS, 7)
149   task = factory.create()
151   # We expect it to print hello three times before coming to an end.
152   task.execute()
153   beam.tasks.wait(task)
155 if __name__ == '__main__':
156   main()


The above example works, but upon reflection we should notice that this Task is responsible for two distinct things. One is the responsibility of printing, and the other is the responsibility of repeating. Given that the purpose of tasks is to be able to compose asynchronous operations we should separate these two responsibilities from one another. To do that we will change our HelloTask so that all it does is print hello after a specified time period, followed by a RepetitionTask which repeats a task a specified number of times. The benefit of this is that our RepetitionTask can be reused to repeat any task whatsoever down the road.

We will define it as follows:

  1 import copy
  2 import datetime
  3 import time
  5 import beam
  7 class RepetitionTask(beam.tasks.BasicTask):
  8   '''Repeats a Task a specified number of times.'''
 10   def __init__(self, task_factory, iterations):
 11     '''Constructs the Task.
 12     :param task_factory: Builds the Task to repeat.
 13     :param iterations: The number of times to repeat the Task.
 14     '''
 15     beam.tasks.BasicTask.__init__(self)
 17     # We should always make a deep copy of factories in order to
 18     # avoid modifying a factory belonging to another task or having
 19     # another task modify our factory.
 20     self.task_factory = copy.deepcopy(task_factory)
 21     self.iterations = iterations
 22     self.counter = 0
 24     # This stores the task currently being executed.
 25     self.task = None
 27     # This is used to handle callbacks from our tasks.
 28     self.tasks = beam.RoutineTaskQueue()
 30   def on_execute(self):
 32     # Defer to a helper function.
 33     self.execute_task()
 35   def on_cancel(self):
 37     # As before, to avoid race conditions between cancels and
 38     # our task we will push a callback onto a RoutineTaskQueue
 39     # to handle cancellations.
 40     self.tasks.push(self._on_cancel)
 42   def on_state(self, state_entry):
 44     # This method handles transitions of the task we're repeating.
 45     if state_entry.state == beam.tasks.Task.State.CANCELED:
 47       # This branch indicates that we canceled our task which
 48       # means that we're handling a cancel request.
 49       self.set_terminal(beam.tasks.Task.State.CANCELED)
 50     elif beam.tasks.is_terminal(state_entry.state):
 52       # This branch indicates that our task terminated and
 53       # hence we should repeat.
 54       self.execute_task()
 56   def _on_cancel(self):
 57     if self.counter < self.iterations:
 59       # Similar to before, only cancel if we still have
 60       # repetitions to process.
 61       self.task.cancel()
 63   def execute_task(self):
 64     if self.counter >= self.iterations:
 66       # This branch indicates there are no more iterations left.
 67       self.set_terminal()
 68     else:
 70       # This branch indicates that we need to repeat the task
 71       # by constructing a new one and executing it.
 72       self.counter += 1
 73       self.task = self.task_factory.create()
 74       self.task.get_publisher().monitor(self.tasks.get_slot(self.on_state))
 75       self.task.execute()
 77 # This class is very similar to the HelloTaskFactory.
 78 class RepetitionTaskFactory(beam.tasks.TaskFactory):
 79   ITERATIONS = 'iterations'
 81   def __init__(self, task_factory):
 82     beam.tasks.TaskFactory.__init__(self)
 83     self.task_factory = copy.deepcopy(task_factory)
 84     self.define_property(RepetitionTaskFactory.ITERATIONS, 10)
 85     self.continuation_task = None
 87   def create(self):
 88     if self.continuation_task is None:
 89       return RepetitionTask(self.task_factory,
 90         self.get(RepetitionTaskFactory.ITERATIONS))
 91     else:
 92       continuation = RepetitionTask(self.task_factory,
 93         self.get(RepetitionTaskFactory.ITERATIONS) -
 94         self.continuation_task.counter)
 95       self.continuation_task = None
 96       return continuation
 98   def prepare_continuation(self, task):
 99     self.continuation_task = task
102 # Now that we have factored out the job of repeating tasks we can rewrite our
103 # HelloTask as follows:
105 class HelloTask(beam.tasks.BasicTask):
106   def __init__(self, timer):
107     beam.tasks.BasicTask.__init__(self)
108     self.timer = timer
109     self.tasks = beam.RoutineTaskQueue()
111   def on_execute(self):
112     self.timer.get_publisher().monitor(self.tasks.get_slot(self.on_timer))
113     self.timer.start()
114     self.set_active()
116   def on_cancel(self):
117     self.tasks.push(self._on_cancel)
119   def on_timer(self, result):
120     print('hello')
121     if result == beam.threading.Timer.Result.EXPIRED:
122       self.set_terminal()
123     else:
124       self.set_terminal(beam.tasks.Task.State.CANCELED)
126   def _on_cancel(self):
127     self.timer.cancel()
129 class HelloTaskFactory(beam.tasks.TaskFactory):
130   def __init__(self, timer_factory):
131     beam.tasks.TaskFactory.__init__(self)
132     self.timer_factory = timer_factory
134   def create(self):
135     return HelloTask(self.timer_factory(datetime.timedelta(seconds = 1)))
137 # Finally once these two pieces are in place we can combine them as follows:
139 def main():
141   # First build the HelloTaskFactory
142   hello_factory = HelloTaskFactory(beam.threading.LiveTimer)
144   # Pass the above factory into the RepetitionTaskFactory.
145   # The result is a factory that composes a RepetitionTask with
146   # a HelloTask.
147   factory = RepetitionTaskFactory(hello_factory)
148   factory.set(RepetitionTaskFactory.ITERATIONS, 5)
150   # Now we can use the factory similarly to how we used it before.
151   task = factory.create()
152   task.execute()
153   time.sleep(2)
154   task.cancel()
155   beam.tasks.wait(task)
156   factory.prepare_continuation(task)
157   factory.set(RepetitionTaskFactory.ITERATIONS, 7)
158   task = factory.create()
159   task.execute()
160   beam.tasks.wait(task)
162 if __name__ == '__main__':
163   main()

In actuality, we can take this decomposition one step further by factoring out from the HelloTask the responsibility of printing 'hello' with the responsibility of running a task after a specified period of time. If we so desired we would write a TimerTask/TimerTaskFactory and then our final composition would be along the lines of a RepetitionTaskFactory(TimerTaskFactory(HelloTaskFactory(), beam.threading.LiveTimer)).