Note: Decision Workflows are still in experimental mode and subject to change. Please provide feedback to help us improve the feature.
Decision workflows are a way to orchestrate multiple app runs and processes in a single pipeline. This is useful when you want to run multiple models, compare them, and then select the best one. Or, when you need to combine multiple data sources to and pre-process them before running a model. There are endless possibilities in how to combine things, but they all share a common structure:
- Workflow/Pipeline: A number of steps with dependencies between them.
- Step: A single item of work that (optionally) needs some other steps to be completed before it can start. There are two types of steps:
- AppStep: A step that runs another app that is available in the same team.
- InlineStep: A step that runs a script inline.
- DAG: A Directed Acyclic Graph that is implicitly defined by the dependencies between steps (see example below).
nextpipe
nextpipe is a Python package that allows you to define and execute decision pipelines. The workflow is defined as a Python class inheriting from nextpipe.FlowSpec
. Functions of this class define the steps of the workflow. Decorators are used to define dependencies between steps and other properties.
Available decorators:
nextpipe.step
: Marks a function as a step.nextpipe.needs
: Defines all the steps that need to be completed before this step can start. Furthermore, the return value of the needed steps will be passed as arguments to the step.nextpipe.app
: Marks a step as an app step, meaning it will run another app available in the same team. The app is configured via parameters likeapp_id
,instance_id
orparameters
.nextpipe.repeat
: This decorator can be used to repeat a step multiple times. This is useful for ensemble runs aiming to leverage variability for example. Other steps needing the repeated step will receive a list of results instead of a single result.
The following list provides an overview of further basic concepts of modeling a decision workflow with nextpipe:
- Steps that do not declare a dependency via
needs
will be executed immediately and receive the input of the request as an argument. - The overall return value of the pipeline needs to be explicitly chosen via
flow.get_result(<step_func>)
. (see example below)
Installation
To get started with nextpipe, we only need to install the package via pip:
Simple workflow example
This section provides a simple example that can be copied and pasted into a Python file. The example defines the following workflow of 3 steps. For further examples, please refer to the examples section of nextpipe.
A pre-requisite for running this example is to have the echo
app available in the same team as the workflow. You can find the echo
app in the in the nextpipe repository here.
The workflow receives the input data in the prepare
step and just adds a "prepared": True
key to the input. The solve
step passes the result of the prepare
step to an app called echo
. This app is expected to exist in the same team and is configured via the app_id
parameter. The enhance
step receives the result of the solve
step and adds a key "enhanced": True
to the result.
Complex workflow example
This section provides a rather complex example of a decision workflow to showcase a number of features in combination. The workflow runs three different routing models (one of them multiple times to leverage variability) and then selects the best result based on the minimum value of the result.
A pre-requisite for running this example is to have the following marketplace apps available in your team (use the app_id
s provided below):
- Nextmv Routing:
routing-nextroute
- OR-Tools Routing:
routing-ortools
- PyVroom Routing:
routing-pyvroom
The workflow fetches data from an URL in the fetch_data
step. This is done to showcase an external data source that could be driven via the request body instead of sending the data directly. The workflow then runs the three different routing models. The run_nextroute
step is repeated three times to leverage variability and the pick_best
step selects the best result based on the minimum value of the result.