Tasks

From Spire Trading Inc.
Jump to: navigation, search

Tasks provide a way of encapsulating and managing asynchronous actions in a composable manner. They are primarily used to perform some sort of I/O operation and/or manage a series of sub-tasks. Beam provides several classes to define your own tasks as well as classes that connect tasks to one another.

The task API in Beam is defined by two base classes, the class Beam::Tasks::Task which represents a task, and the class Beam::Tasks::TaskFactory which constructs tasks. In this article we will explore how we can use these two classes in ways that allow us to start, stop, modify and resume asynchronous actions.

Interface

Task

The Task class defines two methods, an Execute() method to start the task and a Cancel() method to request that the task stop. Both operations are asynchronous, meaning that they return immediately and the actual operation of the task takes place in a separate thread of execution. In order to keep track of the progress of a task one must monitor its state which can be any of the following:

State Description
NONE The task has not yet been executed.
INITIALIZING The task is performing some initialization, during this state no sub-tasks may be executed.
ACTIVE The task is running, during this state sub-tasks may be executed.
PENDING_CANCEL A request to cancel the task has been made, no new sub-tasks may be executed.
CANCELED The task was canceled.
FAILED The task could not complete due to an error.
EXPIRED The task could not complete due to a time constraint.
COMPLETE The task completed successfully.

The states CANCELED, FAILED, EXPIRED and COMPLETE all represent terminal states. Before a task can transition to a terminal state, all of its sub tasks must be in a terminal state. Furthermore once a task is in a terminal state the task is not permitted to perform any additional action or transition to a different state.

The current state of the task along with all of its transitions is accessed through its Publisher by calling the GetPublisher() method. The publisher will emit objects of type Beam::Tasks::Task::StateEntry which contains two fields as follows:

Name Type Description
m_state Beam::Tasks::Task::State The state of the task.
m_message std::string A message describing the reason for the transition.

In effect a task begins in the NONE state, is then started via the Execute() method, transitions into the INITIALIZATION state where it performs any setup needed, then proceeds to the ACTIVE state where it performs its operation including executing any required sub-tasks, and then finally terminates all of its sub-tasks and performs any final clean-up before ending in a terminal state. During any point after the INITIALIZATION state it may encounter a cancel request (via the Cancel() method) or an error, either of which should result in cleaning-up and terminating its sub-tasks before transitioning to the FAILED or CANCELED state.

Task Factory

The task factory is responsible for creating new tasks by invoking its Create() method. In addition to this a task factory also keeps track of the parameters and state needed to create tasks. Setting a task's parameter is done by calling the factory's Set(name, value) method and retrieving a parameter is done via the Get(name) method.

Continuations

One additional operation that task factories perform is constructing what is called a continuation task. Continuations are a mechanism that allow us to resume a task that previously terminated, but more than that they also allow us to modify the parameters of that task. In a sense, a continuation lets us take a terminated task, modify it, and then continue running it with those modifications.

To do this, task factories have a method PrepareContinuation(task). To use it you pass into it a task that has terminated, then you modify its properties using the Set(name, value) method, and then finally you invoke the Create() method to get a newly constructed task that represents the continuation of the old task.

Not all tasks support continuations, those that do not will throw a Beam::NotSupportedException.

BasicTask

As a great deal of care must be taken to ensure that tasks properly transition from State to State, including managing sub-tasks, handling cancel requests etc... Beam provides the Beam::Tasks::BasicTask as a base class which can be inherited from to take care of much of the work needed to write a proper task. For example it ensures that upon a cancel request that the task transitions into the PENDING_CANCEL state and terminates all managed sub-tasks.

To make use of a the BasicTask you need only implement the OnExecute() method to start your task, and the OnCancel() method to handle cancelling your class.

Example

To showcase a simple example, let's build a task that prints "hello" every second a given number of times. We will write a HelloTask class which inherits from BasicTask, and a HelloTaskFactory.

First Attempt

  1 import beam
  2 import datetime
  3 import time
  4 
  5 class HelloTask(beam.tasks.BasicTask):
  6   '''Prints hello a specified number of times.'''
  7 
  8   def __init__(self, timer, iterations):
  9     ''' Constructs this task.
 10     :param timer: The Timer used to wait between successive prints.
 11     :param iterations: The number of times to print.
 12     '''
 13 
 14     # We must make sure to initialize the base class.
 15     beam.tasks.BasicTask.__init__(self)
 16     self.timer = timer
 17     self.iterations = iterations
 18 
 19     # The number of times we've printed so far.
 20     self.counter = 0
 21 
 22     # Used to handle timer callbacks.
 23     self.tasks = beam.RoutineTaskQueue()
 24 
 25   # This overrides BasicTask.on_execute.
 26   # When called we can assume that our Task is in the INITIALIZATION state.
 27   def on_execute(self):
 28 
 29     # To initialize our task we will monitor our timer and then start the timer.
 30     self.timer.get_publisher().monitor(self.tasks.get_slot(self.on_timer))
 31     self.timer.start()
 32 
 33     # Once the initialization is complete we transition to the ACTIVE state.
 34     self.set_active()
 35 
 36   # This overrides BasicTask.on_cancel.
 37   # When called we can assume that our Task is in the PENDING_CANCEL state.
 38   # No new sub-tasks may be executed.
 39   def on_cancel(self):
 40 
 41     # In order to synchronize handling the cancel operation with the timer,
 42     # we will push a helper function onto our RoutineTaskQueue.
 43     # This ensures no race-conditions take place between the timer and
 44     # the cancel request.
 45     self.tasks.push(self._on_cancel)
 46 
 47   # This handles the timer expiry.
 48   def on_timer(self, result):
 49     if result == beam.threading.Timer.Result.EXPIRED:
 50 
 51       # This branch implies that our timer expired normally.
 52       print('hello')
 53       self.counter += 1
 54       if self.counter >= self.iterations:
 55 
 56         # There is nothing more to print, so transition to
 57         # a terminal state, which by default is the COMPLETE state.
 58         self.set_terminal()
 59       else:
 60 
 61         # There are still further iterations, restart the timer.
 62         self.timer.start()
 63     else:
 64 
 65         # This branch implies that we canceled the timer in response
 66         # to a cancel request.
 67         # In this case we set our state to CANCELED.
 68         self.set_terminal(beam.tasks.Task.State.CANCELED)
 69 
 70   def _on_cancel(self):
 71     if self.counter < self.iterations:
 72 
 73       # Only cancel the timer if there are iterations remaining.
 74       self.timer.cancel()
 75 
 76 class HelloTaskFactory(beam.tasks.TaskFactory):
 77   '''Builds HelloTasks.'''
 78 
 79   # Typically the parameters that get passed into the Task are
 80   # defined as static constant strings in the TaskFactory.
 81   # This makes it easier to identify the properties of a task
 82   # as well as reference them (avoiding potential typos).
 83   ITERATIONS = 'iterations'
 84 
 85   def __init__(self, timer_factory):
 86     '''Constructs a HelloTaskFactory.
 87     :param timer_factory: Used to construct Timers.
 88     '''
 89 
 90     # Factories will typically inherit from TaskFactory as a base class.
 91     beam.tasks.TaskFactory.__init__(self)
 92     self.timer_factory = timer_factory
 93 
 94     # The constructor should define all the properties and default
 95     # values for those properties.
 96     self.define_property(HelloTaskFactory.ITERATIONS, 10)
 97 
 98     # This variable keeps track of continuations.
 99     self.continuation_task = None
100 
101   # This overrides the TaskFactory create method.
102   def create(self):
103 
104     if self.continuation_task is None:
105 
106       # We are not creating a continuation task, so all we need to do is
107       # construct a HelloTask using the ITERATIONS property defined above.
108       return HelloTask(self.timer_factory(datetime.timedelta(seconds = 1)),
109         self.get(HelloTaskFactory.ITERATIONS))
110     else:
111 
112       # We are creating a continuation task.  The continuation of a HelloTask of
113       # N iterations that has already printed C times is basically a HelloTask
114       # that prints N - C times.
115       continuation = HelloTask(
116         self.timer_factory(datetime.timedelta(seconds = 1)),
117         self.get(HelloTaskFactory.ITERATIONS) - self.continuation_task.counter)
118       self.continuation_task = None
119       return continuation
120 
121   # This overrides the TaskFactory prepare_continuation method.
122   def prepare_continuation(self, task):
123 
124     # We store the task to continue.
125     self.continuation_task = task
126 
127 def main():
128 
129   # Construct the HelloTaskFactory using the LiveTimer for 5 iterations.
130   factory = HelloTaskFactory(beam.threading.LiveTimer)
131   factory.set(HelloTaskFactory.ITERATIONS, 5)
132   task = factory.create()
133   task.execute()
134 
135   # Let's sleep for 2 seconds before canceling the task.
136   # It should print hello twice.
137   time.sleep(2)
138   task.cancel()
139 
140   # Wait for the task to enter the CANCELED state.
141   beam.tasks.wait(task)
142 
143   # Build the continuation task and execute it.  To do this we first
144   # call prepare_continuation, then we make our modifications using the set
145   # method, then we create the task and execute it.
146   factory.prepare_continuation(task)
147   factory.set(HelloTaskFactory.ITERATIONS, 7)
148   task = factory.create()
149 
150   # We expect it to print hello three times before coming to an end.
151   task.execute()
152   beam.tasks.wait(task)
153 
154 if __name__ == '__main__':
155   main()

Decomposition

The above example works, but upon reflection we should notice that this Task is responsible for two distinct things. One is the responsibility of printing, and the other is the responsibility of repeating. Given that the purpose of tasks is to be able to compose asynchronous operations we should separate these two responsibilities from one another. To do that we will change our HelloTask so that all it does is print hello after a specified time period, followed by a RepetitionTask which repeats a task a specified number of times. The benefit of this is that our RepetitionTask can be reused to repeat any task whatsoever down the road.

We will define it as follows:

  1 class RepetitionTask(beam.tasks.BasicTask):
  2   '''Repeats a Task a specified number of times.'''
  3 
  4   def __init__(self, task_factory, iterations):
  5     '''Constructs the Task.
  6     :param task_factory: Builds the Task to repeat.
  7     :param iterations: The number of times to repeat the Task.
  8     '''
  9     beam.tasks.BasicTask.__init__(self)
 10 
 11     # We should always make a deep copy of factories in order to
 12     # avoid modifying a factory belonging to another task or having
 13     # another task modify our factory.
 14     self.task_factory = copy.deepcopy(task_factory)
 15     self.iterations = iterations
 16     self.counter = 0
 17 
 18     # This stores the task currently being executed.
 19     self.task = None
 20 
 21     # This is used to handle callbacks from our tasks.
 22     self.tasks = beam.RoutineTaskQueue()
 23 
 24   def on_execute(self):
 25 
 26     # Defer to a helper function.
 27     self.execute_task()
 28 
 29   def on_cancel(self):
 30 
 31     # As before, to avoid race conditions between cancels and
 32     # our task we will push a callback onto a RoutineTaskQueue
 33     # to handle cancellations.
 34     self.tasks.push(self._on_cancel)
 35 
 36   def on_state(self, state_entry):
 37 
 38     # This method handles transitions of the task we're repeating.
 39     if state_entry.state == beam.tasks.Task.State.CANCELED:
 40 
 41       # This branch indicates that we canceled our task which
 42       # means that we're handling a cancel request.
 43       self.set_terminal(beam.tasks.Task.State.CANCELED)
 44     elif beam.tasks.is_terminal(state_entry.state):
 45 
 46       # This branch indicates that our task terminated and
 47       # hence we should repeat.
 48       self.execute_task()
 49 
 50   def _on_cancel(self):
 51     if self.counter < self.iterations:
 52 
 53       # Similar to before, only cancel if we still have
 54       # repetitions to process.
 55       self.task.cancel()
 56 
 57   def execute_task(self):
 58     if self.counter >= self.iterations:
 59 
 60       # This branch indicates there are no more iterations left.
 61       self.set_terminal()
 62     else:
 63 
 64       # This branch indicates that we need to repeat the task
 65       # by constructing a new one and executing it.
 66       self.counter += 1
 67       self.task = self.task_factory.create()
 68       self.task.get_publisher().monitor(self.tasks.get_slot(self.on_state))
 69       self.task.execute()
 70 
 71 # This class is very similar to the HelloTaskFactory.
 72 class RepetitionTaskFactory(beam.tasks.TaskFactory):
 73   ITERATIONS = 'iterations'
 74 
 75   def __init__(self, task_factory):
 76     beam.tasks.TaskFactory.__init__(self)
 77     self.task_factory = copy.deepcopy(task_factory)
 78     self.define_property(RepetitionTaskFactory.ITERATIONS, 10)
 79     self.continuation_task = None
 80 
 81   def create(self):
 82     if self.continuation_task is None:
 83       return RepetitionTask(self.task_factory,
 84         self.get(RepetitionTaskFactory.ITERATIONS))
 85     else:
 86       continuation = RepetitionTask(self.task_factory,
 87         self.get(RepetitionTaskFactory.ITERATIONS) -
 88         self.continuation_task.counter)
 89       self.continuation_task = None
 90       return continuation
 91 
 92   def prepare_continuation(self, task):
 93     self.continuation_task = task
 94 
 95 
 96 Now that we have factored out the job of repeating tasks we can rewrite our HelloTask as follows:
 97 
 98 class HelloTask(beam.tasks.BasicTask):
 99   def __init__(self, timer):
100     beam.tasks.BasicTask.__init__(self)
101     self.timer = timer
102     self.tasks = beam.RoutineTaskQueue()
103 
104   def on_execute(self):
105     self.timer.get_publisher().monitor(self.tasks.get_slot(self.on_timer))
106     self.timer.start()
107     self.set_active()
108 
109   def on_cancel(self):
110     self.tasks.push(self._on_cancel)
111 
112   def on_timer(self, result):
113     print 'hello'
114     if result == beam.threading.Timer.Result.EXPIRED:
115       self.set_terminal()
116     else:
117       self.set_terminal(beam.tasks.Task.State.CANCELED)
118 
119   def _on_cancel(self):
120     self.timer.cancel()
121 
122 class HelloTaskFactory(beam.tasks.TaskFactory):
123   def __init__(self, timer_factory):
124     beam.tasks.TaskFactory.__init__(self)
125     self.timer_factory = timer_factory
126 
127   def create(self):
128     return HelloTask(self.timer_factory(datetime.timedelta(seconds = 1)))
129 
130 Finally once these two pieces are in place we can combine them as follows:
131 
132 def main():
133 
134   # First build the HelloTaskFactory
135   hello_factory = HelloTaskFactory(beam.threading.LiveTimer)
136 
137   # Pass the above factory into the RepetitionTaskFactory.
138   # The result is a factory that composes a RepetitionTask with
139   # a HelloTask.
140   factory = RepetitionTaskFactory(hello_factory)
141   factory.set(RepetitionTaskFactory.ITERATIONS, 5)
142 
143   # Now we can use the factory similarly to how we used it before.
144   task = factory.create()
145   task.execute()
146   time.sleep(2)
147   task.cancel()
148   beam.tasks.wait(task)
149   factory.prepare_continuation(task)
150   factory.set(RepetitionTaskFactory.ITERATIONS, 7)
151   task = factory.create()
152   task.execute()
153   beam.tasks.wait(task)
154 
155 if __name__ == '__main__':
156   main()

In actuality, we can take this decomposition one step further by factoring out from the HelloTask the responsibility of printing 'hello' with the responsibility of running a task after a specified period of time. If we so desired we would write a TimerTask/TimerTaskFactory and then our final composition would be along the lines of a RepetitionTaskFactory(TimerTaskFactory(HelloTaskFactory(), beam.threading.LiveTimer)).