Build decision workflows to automate multiple steps in an optimization pipeline

Link to blog post >>

Optimization is a process. Solving a model is part of that process, but there are many other steps required to complete it: from fetching your input data to creating custom visualizations and transforming the resulting plan into a format that’s readable for operators or downstream systems.

For decision modelers, writing and sustaining these types of optimization pipelines manually across multiple systems can quickly become cumbersome. There are models to manage, services to deploy, data transformations to wrangle, and handoffs to track. Similarly, this work is equally time-consuming for an engineering team to own.

In the same way that tools like MLflow streamline this process for machine learning workflows, at Nextmv, we’ve built decision workflows specifically for the decision science space so modelers can build, observe, and manage optimization pipelines with far less engineering effort. Now, modelers can create and deploy an end-to-end workflow in minutes, monitor each step, and easily identify where issues arise with flow charts and logs in the Nextmv UI. Let’s take a broad look at decision workflows, how to set them up in Nextmv, and then dive into a few detailed examples.

What is a decision workflow?

A decision workflow is a pipeline that automates multiple steps in an optimization process. For example, a decision workflow for picking the best solution from multiple routing models might have the following steps: preparing data, converting that data to a different schema if needed, running each of the routing models, and then choosing the plan with the best solution value.

In Nextmv, you can watch as each step of the workflow progresses and completes.

Flow chart in Nextmv UI

Here’s what the example decision workflow above looks like to create and edit with the Nextmv Python SDK. Python decorators (e.g., @step) help automate passing data from one step to the next in the workflow. The optimization models being called are simply decision apps that you have in your Nextmv account. They can be prebuilt apps or custom apps. In this case, we used three routing models that are available in the Nextmv Marketplace (Nextmv Routing, OR-Tools Routing, and PyVroom Routing).

import copy
import json

import nextmv

from nextpipe import FlowSpec, app, log, needs, repeat, step


# >>> Workflow definition
class Flow(FlowSpec):
    @step
    def prepare(input: dict):
        """Prepares the data."""
        return input

    @needs(predecessors=[prepare])
    @step
    def convert(input: dict):
        """Converts the data."""
        clone = copy.deepcopy(input)
        if "defaults" in clone and "stops" in clone["defaults"] and "quantity" in clone["defaults"]["stops"]:
            clone["defaults"]["stops"]["quantity"] *= -1
        for stop in clone["stops"]:
            if "quantity" in stop:
                stop["quantity"] *= -1
        return clone

    @repeat(repetitions=2)
    @app(app_id="routing-nextroute", instance_id="latest")
    @needs(predecessors=[prepare])
    @step
    def run_nextroute():
        """Runs the model."""
        pass

    @app(app_id="routing-ortools", instance_id="latest")
    @needs(predecessors=[convert])
    @step
    def run_ortools():
        """Runs the model."""
        pass

    @app(app_id="routing-pyvroom", instance_id="latest")
    @needs(predecessors=[convert])
    @step
    def run_pyvroom():
        """Runs the model."""
        pass

    @needs(predecessors=[run_nextroute, run_ortools, run_pyvroom])
    @step
    def pick_best(
        results_nextroute: list[dict],
        result_ortools: dict,
        result_pyvroom: dict,
    ):
        """Aggregates the results."""
        results = results_nextroute + [result_ortools, result_pyvroom]
        best_solution_idx = min(
            range(len(results)),
            key=lambda i: results[i]["statistics"]["result"]["value"],
        )

        values = [result["statistics"]["result"]["value"] for result in results]
        values.sort()
        log(f"Values: {values}")

        # For test stability reasons, we always return the or-tools result
        _ = results.pop(best_solution_idx)
        return result_ortools


def main():
    # Load input data
    input = nextmv.load_local()

    # Run workflow
    flow = Flow("DecisionFlow", input.data)
    flow.run()
    result = flow.get_result(flow.pick_best)
    print(json.dumps(result))


if __name__ == "__main__":
    main()

You can run the workflow directly from the Nextmv UI by choosing an input file, the instance of the workflow you’d like to run, required secrets (e.g., API keys), and optional configuration.

Running a decision workflow in the Nextmv UI

Once the run kicks off, you’ll have access to the logs for the workflow as it progresses through each step. This view will highlight exactly where in the flow any issues occur, making troubleshooting easier.

Decision workflow logs in the Nextmv UI

The Details page highlights any custom statistics you’ve chosen to surface, along with run information such as configuration.

Decision workflow run details in the Nextmv UI

The Results page displays the selected plan. In this case, we see routes on a map, but you could also include a custom visualization as part of the workflow to highlight specific KPIs.

Results of a decision workflow on a map in the Nextmv UI

Link to blog post >>