pythonflow package

exception pythonflow.core.EvaluationError[source]

Bases: RuntimeError

Failed to evaluate an operation.

class pythonflow.core.Graph[source]

Bases: object

Data flow graph constituting a directed acyclic graph of operations.

apply(fetches, context=None, *, callback=None, **kwargs)[source]

Evaluate one or more operations given a context.

Note

This function modifies the context in place. Use context=context.copy() to avoid the context being modified.

Parameters:
  • fetches (list[str or Operation] or str or Operation) – One or more Operation instances or names to evaluate.
  • context (dict or None) – Context in which to evaluate the operations.
  • callback (callable or None) – Callback to be evaluated when an operation is evaluated.
  • kwargs (dict) – Additional context information keyed by variable name.
Returns:

values – Output of the operations given the context.

Return type:

tuple[object]

Raises:
  • ValueError – If fetches is not an Operation instance, operation name, or a sequence thereof.
  • ValueError – If context is not a mapping.
static get_active_graph(graph=None)[source]

Obtain the currently active graph instance by returning the explicitly given graph or using the default graph.

Parameters:graph (Graph or None) – Graph to return or None to use the default graph.
Raises:ValueError – If no Graph instance can be obtained.
normalize_context(context, **kwargs)[source]

Normalize a context by replacing all operation names with operation instances.

Note

This function modifies the context in place. Use context=context.copy() to avoid the context being modified.

Parameters:
  • context (dict[Operation or str, object]) – Context whose keys are operation instances or names.
  • kwargs (dict[str, object]) – Additional context information keyed by variable name.
Returns:

normalized_context – Normalized context whose keys are operation instances.

Return type:

dict[Operation, object]

Raises:
  • ValueError – If the context specifies more than one value for any operation.
  • ValueError – If context is not a mapping.
normalize_operation(operation)[source]

Normalize an operation by resolving its name if necessary.

Parameters:

operation (Operation or str) – Operation instance or name of an operation.

Returns:

normalized_operation – Operation instance.

Return type:

Operation

Raises:
  • ValueError – If operation is not an Operation instance or an operation name.
  • RuntimeError – If operation is an Operation instance but does not belong to this graph.
  • KeyError – If operation is an operation name that does not match any operation of this graph.
class pythonflow.core.Operation(*args, length=None, graph=None, name=None, dependencies=None, **kwargs)[source]

Bases: object

Base class for operations.

Parameters:
  • args (tuple) – Positional arguments passed to the _evaluate method.
  • kwargs (dict) – Keyword arguments passed to the _evaluate method.
  • length (int or None) – Optional number of values returned by the operation. The length only needs to be specified if the operation should support iterable [unpacking](https://www.python.org/dev/peps/pep-3132/).
  • graph (Graph or None) – Data flow graph for this operation or None to use the default graph.
  • name (str or None) – Name of the operation or None to use a random, unique identifier.
  • dependencies (list) – Explicit sequence of operations to evaluate before evaluating this operation.
evaluate(context, callback=None)[source]

Evaluate the operation given a context.

Parameters:
  • context (dict) – Normalised context in which to evaluate the operation.
  • callback (callable or None) – Callback to be evaluated when an operation is evaluated.
Returns:

value – Output of the operation given the context.

Return type:

object

evaluate_dependencies(context, callback=None)[source]

Evaluate the dependencies of this operation and discard the values.

Parameters:
  • context (dict) – Normalised context in which to evaluate the operation.
  • callback (callable or None) – Callback to be evaluated when an operation is evaluated.
classmethod evaluate_operation(operation, context, **kwargs)[source]

Evaluate an operation or constant given a context.

name

Unique name of the operation

Type:str
set_name(name)[source]

Set the name of the operation and update the graph.

Parameters:

value (str) – Unique name of the operation.

Returns:

self – This operation.

Return type:

Operation

Raises:
  • ValueError – If an operation with value already exists in the associated graph.
  • KeyError – If the current name of the operation cannot be found in the associated graph.
pythonflow.core.call(func, *args, **kwargs)[source]

Call func with positional arguments args and keyword arguments kwargs.

Parameters:
  • func (callable) – Function to call when the operation is executed.
  • args (list) – Sequence of positional arguments passed to func.
  • kwargs (dict) – Mapping of keyword arguments passed to func.
pythonflow.core.control_dependencies(dependencies, graph=None)[source]

Ensure that all dependencies are executed before any operations in this scope.

Parameters:dependencies (list) – Sequence of operations to be evaluted before evaluating any operations defined in this scope.
class pythonflow.core.func_op(target, *args, **kwargs)[source]

Bases: pythonflow.core.Operation

Operation wrapper for stateless functions.

Parameters:
  • target (callable) – function to evaluate the operation
  • args (tuple) – positional arguments passed to the target
  • kwargs (dict) – keywoard arguments passed to the target
pythonflow.core.opmethod(target=None, **kwargs)[source]

Decorator for creating operations from functions.

pythonflow.operations.assert_(condition, message=None, *args, value=None)[source]

Return value if the condition is satisfied and raise an AssertionError with the specified message and args if not.

pythonflow.operations.cache(operation, get, put, key=None)[source]

Cache the values of operation.

Parameters:
  • operation (Operation) – Operation to cache.
  • get (callable(object)) – Callable to retrieve an item from the cache. Should throw KeyError or FileNotFoundError if the item is not in the cache.
  • put (callable(object, object)) – Callable that adds an item to the cache. The first argument is the key, the seconde the value.
  • key (Operation) – Key for looking up an item in the cache. Defaults to a simple hash of the arguments of operation.
Returns:

cached_operation – Cached operation.

Return type:

Operation

pythonflow.operations.cache_file(operation, filename_template, load=None, dump=None, key=None)[source]

Cache the values of operation in a file.

Parameters:
  • operation (Operation) – Operation to cache.
  • filename_template (str) – Template for the filename taking a single key parameter.
  • load (callable(str)) – Callable to retrieve an item from a given file. Should throw FileNotFoundError if the file does not exist.
  • dump (callable(object, str)) – Callable to save the item to a file. The order of arguments differs from the put argument of cache to be compatible with pickle.dump, numpy.save, etc.
  • key (Operation) – Key for looking up an item in the cache. Defaults to a simple hash of the arguments of operation.
Returns:

cached_operation – Cached operation.

Return type:

Operation

class pythonflow.operations.conditional(predicate, x, y=None, *, length=None, name=None, dependencies=None)[source]

Bases: pythonflow.core.Operation

Return x if predicate is True and y otherwise.

Note

The conditional operation will only execute one branch of the computation graph depending on predicate.

evaluate(context, callback=None)[source]

Evaluate the operation given a context.

Parameters:
  • context (dict) – Normalised context in which to evaluate the operation.
  • callback (callable or None) – Callback to be evaluated when an operation is evaluated.
Returns:

value – Output of the operation given the context.

Return type:

object

pythonflow.operations.constant(value)

Operation returning the input value.

pythonflow.operations.identity(value)[source]

Operation returning the input value.

class pythonflow.operations.lazy_constant(target, **kwargs)[source]

Bases: pythonflow.core.Operation

Operation that returns the output of target lazily.

Parameters:
  • target (callable) – Function to evaluate when the operation is evaluated.
  • kwargs (dict) – Keyword arguments passed to the constructor of Operation.
class pythonflow.operations.placeholder(name=None, **kwargs)[source]

Bases: pythonflow.core.Operation

Placeholder that needs to be given in the context to be evaluated.

pythonflow.operations.str_format(format_string, *args, **kwargs)[source]

Use python’s advanced string formatting to convert the format string and arguments.

References

https://www.python.org/dev/peps/pep-3101/

class pythonflow.operations.try_(operation, except_=None, finally_=None, **kwargs)[source]

Bases: pythonflow.core.Operation

Try to evaluate operation, fall back to alternative operations in except_, and ensure that finally_ is evaluated.

Note

The alternative operations will only be executed if the target operation fails.

Parameters:
  • operation (Operation) – Operation to evaluate.
  • except (list[(type, Operation)]) – List of exception types and corresponding operation to evaluate if it occurs.
  • finally (Operation) – Operation to evaluate irrespective of whether operation fails.
evaluate(context, callback=None)[source]

Evaluate the operation given a context.

Parameters:
  • context (dict) – Normalised context in which to evaluate the operation.
  • callback (callable or None) – Callback to be evaluated when an operation is evaluated.
Returns:

value – Output of the operation given the context.

Return type:

object

class pythonflow.util.Profiler[source]

Bases: object

Callback for profiling computational graphs.

times

Mapping from operations to execution times.

Type:dict[Operation, float]
get_slow_operations(num_operations=None)[source]

Get the slowest operations.

Parameters:num_operations (int or None) – Maximum number of operations to return or None
Returns:times – Mapping of execution times keyed by operations.
Return type:collections.OrderedDict
class pythonflow.util.batch_iterable(iterable, batch_size, transpose=False)[source]

Bases: object

Split an iterable into batches of a specified size.

Parameters:
  • iterable (iterable) – Iterable to split into batches.
  • batch_size (int) – Size of each batch.
  • transpose (bool) – Whether to transpose each batch.
pythonflow.util.deprecated(func)[source]

Mark a callable as deprecated.

class pythonflow.util.lazy_import(module)[source]

Bases: object

Lazily import the given module.

Parameters:module (str) – Name of the module to import