Decision Workflows

Run Decision Workflows

A quick introduction to decision workflows via nextpipe.

Note: Decision Workflows are still in experimental mode and subject to change. Please provide feedback to help us improve the feature.

Decision workflows are a way to orchestrate multiple app runs and processes in a single pipeline. This is useful when you want to run multiple models, compare them, and then select the best one. Or, when you need to combine multiple data sources to and pre-process them before running a model. There are endless possibilities in how to combine things, but they all share a common structure:

  1. Workflow/Pipeline: A number of steps with dependencies between them.
  2. Step: A single item of work that (optionally) needs some other steps to be completed before it can start. There are two types of steps:
    1. AppStep: A step that runs another app that is available in the same team.
    2. InlineStep: A step that runs a script inline.
  3. DAG: A Directed Acyclic Graph that is implicitly defined by the dependencies between steps (see example below).

nextpipe

nextpipe is a Python package that allows you to define and execute decision pipelines. The workflow is defined as a Python class inheriting from nextpipe.FlowSpec. Functions of this class define the steps of the workflow. Decorators are used to define dependencies between steps and other properties.

Available decorators:

  • nextpipe.step: Marks a function as a step.
  • nextpipe.needs: Defines all the steps that need to be completed before this step can start. Furthermore, the return value of the needed steps will be passed as arguments to the step.
  • nextpipe.app: Marks a step as an app step, meaning it will run another app available in the same team. The app is configured via parameters like app_id, instance_id or parameters.
  • nextpipe.repeat: This decorator can be used to repeat a step multiple times. This is useful for ensemble runs aiming to leverage variability for example. Other steps needing the repeated step will receive a list of results instead of a single result.

The following list provides an overview of further basic concepts of modeling a decision workflow with nextpipe:

  • Steps that do not declare a dependency via needs will be executed immediately and receive the input of the request as an argument.
  • The overall return value of the pipeline needs to be explicitly chosen via flow.get_result(<step_func>). (see example below)

Installation

To get started with nextpipe, we only need to install the package via pip:

pip install nextpipe
Copy

Simple workflow example

This section provides a simple example that can be copied and pasted into a Python file. The example defines the following workflow of 3 steps. For further examples, please refer to the examples section of nextpipe.

A pre-requisite for running this example is to have the echo app available in the same team as the workflow. You can find the echo app in the in the nextpipe repository here.

graph TD prepare(prepare) prepare --> solve solve(solve) solve --> enhance enhance(enhance)

The workflow receives the input data in the prepare step and just adds a "prepared": True key to the input. The solve step passes the result of the prepare step to an app called echo. This app is expected to exist in the same team and is configured via the app_id parameter. The enhance step receives the result of the solve step and adds a key "enhanced": True to the result.

import json
import os

import nextmv

from nextpipe import FlowSpec, app, needs, step


# >>> Workflow definition
class Flow(FlowSpec):
    @step
    def prepare(input: dict):
        input["prepared"] = True
        return input

    @app(app_id="echo")
    @needs(predecessors=[prepare])
    @step
    def solve():
        pass

    @needs(predecessors=[solve])
    @step
    def enhance(result: dict):
        output = result["solution"]  # Unwrap the solution
        output["echo"]["enhanced"] = True
        return output


def main():
    # Read API key from file (until secrets management support)
    with open("key.json") as f:
        os.environ["NEXTMV_API_KEY"] = json.load(f)["nextmv_api_key"]

    # Load input data
    input = nextmv.load_local()

    # Run workflow
    flow = Flow("DecisionFlow", input.data)
    flow.run()

    # Write out the result
    print(json.dumps(flow.get_result(flow.enhance)))


if __name__ == "__main__":
    main()
Copy

Complex workflow example

This section provides a rather complex example of a decision workflow to showcase a number of features in combination. The workflow runs three different routing models (one of them multiple times to leverage variability) and then selects the best result based on the minimum value of the result.

A pre-requisite for running this example is to have the following marketplace apps available in your team (use the app_ids provided below):

  • Nextmv Routing: routing-nextroute
  • OR-Tools Routing: routing-ortools
  • PyVroom Routing: routing-pyvroom
graph TD fetch_data(fetch_data) fetch_data --> run_nextroute fetch_data --> run_ortools fetch_data --> run_pyvroom run_nextroute{ } run_nextroute_join{ } run_nextroute_0(run_nextroute_0) run_nextroute --> run_nextroute_0 run_nextroute_0 --> run_nextroute_join run_nextroute_1(run_nextroute_1) run_nextroute --> run_nextroute_1 run_nextroute_1 --> run_nextroute_join run_nextroute_2(run_nextroute_2) run_nextroute --> run_nextroute_2 run_nextroute_2 --> run_nextroute_join run_nextroute_join --> pick_best run_ortools(run_ortools) run_ortools --> pick_best run_pyvroom(run_pyvroom) run_pyvroom --> pick_best pick_best(pick_best)

The workflow fetches data from an URL in the fetch_data step. This is done to showcase an external data source that could be driven via the request body instead of sending the data directly. The workflow then runs the three different routing models. The run_nextroute step is repeated three times to leverage variability and the pick_best step selects the best result based on the minimum value of the result.

# >>> Workflow definition
class Flow(FlowSpec):
    @step
    def fetch_data(_):
        """Fetches data from the database."""
        file_url = "https://gist.githubusercontent.com/merschformann/a90959b87d1360b604e4a9f6457340ca/raw/661e631376bdf78a07548a3cd136c1fc6e47c639/muenster.json"
        response = requests.get(file_url)
        return response.json()

    @repeat(repetitions=3)
    @app(app_id="routing-nextroute")
    @needs(predecessors=[fetch_data])
    @step
    def run_nextroute():
        """Runs the model."""
        pass

    @app(app_id="routing-ortools")
    @needs(predecessors=[fetch_data])
    @step
    def run_ortools():
        """Runs the model."""
        pass

    @app(app_id="routing-pyvroom")
    @needs(predecessors=[fetch_data])
    @step
    def run_pyvroom():
        """Runs the model."""
        pass

    @needs(predecessors=[run_nextroute, run_ortools, run_pyvroom])
    @step
    def pick_best(
        results_nextroute: list[dict],
        result_ortools: dict,
        result_pyvroom: dict,
    ):
        # Pick the best result
        results = results_nextroute + [result_ortools, result_pyvroom]
        best_solution_idx = min(
            range(len(results)),
            key=lambda i: results[i]["statistics"]["result"]["value"],
        )

        # Log all values that were found
        values = [result["statistics"]["result"]["value"] for result in results]
        values.sort()
        nextmv.log(f"Values: {values}")

        # Return the best result
        return results[best_solution_idx]


def main():
    # Read API key from file (until secrets management support)
    with open("key.json") as f:
        os.environ["NEXTMV_API_KEY"] = json.load(f)["nextmv_api_key"]

    # Run workflow
    flow = Flow("DecisionFlow", None)
    flow.run()
    result = flow.get_result(flow.pick_best)
    print(json.dumps(result))
Copy

Page last updated

Go to on-page nav menu