A general purpose Task and TaskQueue for running tasks with
dependencies and failure/retry, potentially in parallel.
*Latest release 20230401*:
Add missing requirement to DISTINFO.
## Class `BaseTask(cs.fsm.FSM, cs.gvutils.DOTNodeMixin, cs.resources.RunStateMixin)`
A base class subclassing `cs.fsm.FSM` with a `RunStateMixin`.
Note that this class and the `FSM` base class does not provide
a `FSM_DEFAULT_STATE` attribute; a default `state` value of
`None` will leave `.fsm_state` _unset_.
This behaviour is is chosen mostly to support subclasses
with unusual behaviour, particularly Django's `Model` class
whose `refresh_from_db` method seems to not refresh fields
which already exist, and setting `.fsm_state` from a
`FSM_DEFAULT_STATE` class attribute thus breaks this method.
Subclasses of this class and `Model` should _not_ provide a
`FSM_DEFAULT_STATE` attribute, instead relying on the field
definition to provide this default in the usual way.
## Class `BlockedError(TaskError, cs.fsm.FSMError, builtins.Exception, builtins.BaseException)`
Raised by a blocked `Task` if attempted.
## Function `main(argv)`
Dummy main programme to exercise something.
## Function `make(*tasks, fail_fast=False, queue=None)`
Generator which completes all the supplied `tasks` by dispatching them
once they are no longer blocked.
Yield each task from `tasks` as it completes (or becomes cancelled).
Parameters:
* `tasks`: `Task`s as positional parameters
* `fail_fast`: default `False`; if true, cease evaluation as soon as a
task completes in a state with is not `DONE`
* `queue`: optional callable to submit a task for execution later
via some queue such as `Later` or celery
The following rules are applied by this function:
- if a task is being prepared, raise an `FSMError`
- if a task is already running or queued, wait for its completion
- if a task is pending:
* if any prerequisite has failed, fail this task
* if any prerequisite is cancelled, cancel this task
* if any prerequisite is pending, make it first
* if any prerequisite is not done, fail this task
* otherwise dispatch this task and then yield it
- if `fail_fast` and the task is not done, return
Examples:
>>> t1 = Task('t1', lambda: print('doing t1'), track=True)
>>> t2 = t1.then('t2', lambda: print('doing t2'), track=True)
>>> list(make(t2)) # doctest: +ELLIPSIS
t1 PENDING->dispatch->RUNNING
doing t1
t1 RUNNING->done->DONE
t2 PENDING->dispatch->RUNNING
doing t2
t2 RUNNING->done->DONE
[Task('t2',<function <lambda> at ...>,state='DONE')]
## Function `make_later(L, *tasks, fail_fast=False)`
Dispatch the `tasks` via `L:Later` for asynchronous execution
if it is not already completed.
The caller can wait on `t.result` for completion.
This calls `make_now()` in a thread and uses `L.defer` to
queue the task and its prerequisites for execution.
## Function `make_now(*tasks, fail_fast=False, queue=None)`
Run the generator `make(*tasks)` to completion and return the
list of completed tasks.
## Class `Task(BaseTask, cs.fsm.FSM, cs.gvutils.DOTNodeMixin, cs.resources.RunStateMixin, cs.threads.HasThreadState, cs.context.ContextManagerMixin)`
A task which may require the completion of other tasks.
The model here may not be quite as expected; it is aimed at
tasks which can be repaired and rerun.
As such, if `self.run(func,...)` raises an exception from
`func` then this `Task` will still block dependent `Task`s.
Dually, a `Task` which completes without an exception is
considered complete and does not block dependent `Task`s.
Keyword parameters:
* `cancel_on_exception`: if true, cancel this `Task` if `.run`
raises an exception; the default is `False`, allowing repair
and retry
* `cancel_on_result`: optional callable to test the `Task.result`
after `.run`; if the callable returns `True` the `Task` is marked
as cancelled, allowing repair and retry
* `func`: the function to call to complete the `Task`;
it will be called as `func(*func_args,**func_kwargs)`
* `func_args`: optional positional arguments, default `()`
* `func_kwargs`: optional keyword arguments, default `{}`
* `lock`: optional lock, default an `RLock`
* `state`: initial state, default from `self._state.initial_state`,
which is initally '`PENDING`'
* `track`: default `False`;
if `True` then apply a callback for all states to print task transitions;
otherwise it should be a callback function suitable for `FSM.fsm_callback`
Other arguments are passed to the `Result` initialiser.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 10)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
Users wanting more immediate semantics can supply
`cancel_on_exception` and/or `cancel_on_result` to control
these behaviours.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 2)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
## Class `TaskError(cs.fsm.FSMError, builtins.Exception, builtins.BaseException)`
Raised by `Task` related errors.
## Class `TaskQueue`
A task queue for managing and running a set of related tasks.
Unlike `make` and `Task.make`, this is aimed at a "dispatch" worker
which dispatches individual tasks as required.
Example 1, put 2 dependent tasks in a queue and run:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1, t2)
>>> for _ in q.run(): pass
...
t1
t2
Example 2, put 1 task in a queue and run.
The queue only runs the specified tasks:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1)
>>> for _ in q.run(): pass
...
t1
Example 2, put 1 task in a queue with `run_dependent_tasks=True` and run.
The queue pulls in the dependencies of completed tasks and also runs those:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1, run_dependent_tasks=True)
>>> for _ in q.run(): pass
...
t1
t2
*Method `TaskQueue.__init__(self, *tasks, run_dependent_tasks=False)`*:
Initialise the queue with the supplied `tasks`.
# Release Log
*Release 20230401*:
Add missing requirement to DISTINFO.
*Release 20230331*:
* Task: subclass BaseTask instead of (FSM, RunStateMixin).
* BaseTask.__init__: use @uses_runstate to ensure we've got a RunState.
*Release 20230217*:
Task: subclass HasThreadState, drop .current_task() class method.
*Release 20221207*:
* Pull out core stuff from Task into BaseTask, aids subclassing.
* BaseTask: explainatory docustring about unusual FSM_DEFAULT_STATE design choice.
* BaseTask.tasks_as_dot: express the edges using the node ids instead of their labels.
* BaseTask: new tasks_as_svg() method like tasks_as_dot() but returning SVG.
*Release 20220805*:
Initial PyPI release.