Databricks + Nextmv: Orchestrating Decision Optimization Workflows

Link to demo

:brain: Databricks + Nextmv: Orchestrating Decision Optimization Workflows

We’ve been exploring how to combine the strengths of Databricks and Nextmv to streamline decision optimization workflows—and wanted to share a practical example of how this works in code.

This Nextmv Workflow shows how to trigger a Databricks job from a Nextmv workflow and pull the result back into the Nextmv platform for visualization, comparison, and version tracking. The setup is lightweight but powerful, and it’s a great example of connecting compute platforms with decision infrastructure.

:light_bulb: Why use Nextmv for orchestration?

Nextmv has added decision model operations support like:

:white_check_mark: Visualizing model outcomes in a structured format that is shareable
:white_check_mark: Comparing different parameter sets and model versions
:white_check_mark: Giving non-technical stakeholders a no-code way to test changes
:white_check_mark: Standardizing how decision models are run, tested, and iterated on

By pairing Databricks with Nextmv, you can continue developing in your familiar notebook environment while adding powerful deployment and observability layers behind the scenes.

You can also deploy your model code to Nextmv, manage multiple versions, and more.

:link: How it works

We define a DecisionFlow using the nextpipe library (Nextmv’s workflow engine):

  1. Step 1: Trigger a Databricks Job

    • The create_db_run step authenticates using environment variables (DATABRICKS_HOST, DATABRICKS_TOKEN) and triggers a job in Databricks using their Python SDK.
    • The job ID is passed in via a Nextmv parameter for flexibility across environments.
  2. Step 2: Retrieve Output

    • The return_result step fetches the run output once the Databricks job completes.
    • It parses the notebook’s JSON output and appends metadata (db_job_id, db_task_run_id) into the result object for traceability.
  3. Step 3: Show Output in Nextmv

    • The result is returned in a format compatible with Nextmv’s Output schema, making it instantly viewable and shareable in the Nextmv UI.

:hammer_and_wrench: Setup Requirements

  • A Databricks job ID already configured to a notebook or other workload
  • Nextmv workspace with secrets configured for DATABRICKS_HOST and DATABRICKS_TOKEN
  • Output from the notebook must be valid JSON with a structure compatible with Nextmv (e.g., containing statistics, solution, etc.)

This workflow opens up great possibilities for hybrid teams—data scientists building in Databricks and operations stakeholders consuming results via a clean, purpose-built UI.

Would love to hear how others are integrating decision tooling into their ML and data pipelines!

#DecisionOps #Databricks #Nextmv #Optimization #MLOps #Orchestration

# >>> Workflow definition
class DecisionFlow(FlowSpec):
    @step
    def create_db_run(_):
        """Creates the run on Databricks."""

        db_job_id = options.db_job_id

        # Authenticate (assumes DATABRICKS_HOST and DATABRICKS_TOKEN env vars are set as Nextmv secrets)
        w = WorkspaceClient()
        # Run the job
        run = w.jobs.run_now(job_id=db_job_id).result()
        nextmv.log(f"Created DB run with ID: {run.run_id} for job with ID: {db_job_id}")
        nextmv.log(f"Run tasks: {run.tasks}")
        return run.tasks[0].run_id

    # @app(app_id="echo")
    @needs(predecessors=[create_db_run])
    @step
    def return_result(run_id: str):
        """Gets the result of the run."""
        # Authenticate (assumes DATABRICKS_HOST and DATABRICKS_TOKEN env vars are set)
        w = WorkspaceClient()
        # Get the task run ID from the first task (if single task)
        run_output = w.jobs.get_run_output(run_id=run_id)
        nextmv.log(f"Getting output for DB task: {run_id}")
        nextmv_output = run_output.notebook_output.result
        nextmv.log("Adding DB task run ID to Nextmv output")
        result = json.loads(nextmv_output)
        result["statistics"]["result"]["custom"]["db_task_run_id"] = run_id
        result["statistics"]["result"]["custom"]["db_job_id"] = options.db_job_id
        return result