Databricks Connect
Databricks Connect allows you to centralize your Python code in your Dagster project while executing Spark workloads remotely on a Databricks cluster. Unlike job submission approaches, your code runs in the Dagster process, but Spark operations execute on Databricks compute.
When to use Databricks Connect
Databricks Connect is best for:
- Interactive development and quick iterations
- Centralized code that doesn't need deployment to Databricks
- Moderate-sized workloads where you want simpler debugging
- Greenfield Databricks use cases
It's not suitable for:
- Large batch jobs that should run independently
- Long-running workloads that would block the Dagster process
- Scenarios where network connectivity to Databricks is unreliable
Step 1: Prepare a Dagster project
-
To begin, you'll need a Dagster project. You can use an existing project or create a new one.
-
Activate your project virtual environment:
source .venv/bin/activate -
Add the
databricks-connectlibrary to your project:- uv
- pip
uv add databricks-connectpip install databricks-connect -
Configure your environment:
export DATABRICKS_HOST=https://dbc-xxxxxxx-yyyy.cloud.databricks.com/
export DATABRICKS_TOKEN=<your-personal-access-token>
Step 2: Write a script to run Spark operations on Databricks
Next, write a Python script that connects to Databricks to run Spark operations.
In the example below:
- The Python code runs in your Dagster deployment
- Spark DataFrame operations execute remotely on Databricks
- You have direct access to the Spark API within your asset functions
- There is no job submission overhead for interactive workloads
import os
from databricks.connect import DatabricksSession # ty: ignore[unresolved-import]
from pyspark.sql import SparkSession
import dagster as dg
# Create the Databricks session resource
databricks_session = (
DatabricksSession.builder.remote(
host=dg.EnvVar("DATABRICKS_HOST"),
token=dg.EnvVar("DATABRICKS_TOKEN"),
)
.serverless()
.getOrCreate()
)
@dg.definitions
def resources():
return dg.Definitions(resources={"spark": databricks_session})
@dg.asset
def my_spark_asset(
context: dg.AssetExecutionContext, spark: dg.ResourceParam[SparkSession]
):
# This code runs in Dagster, but Spark operations execute on Databricks
df = spark.sql("SELECT * FROM catalog.schema.table")
result = df.filter(df.status == "active").count()
return dg.MaterializeResult(metadata={"row_count": result})
Data validation example
A common use case for Databricks Connect is enforcing data quality gates in a pipeline. Because your Python logic runs in Dagster, you can raise exceptions that block downstream assets when validation fails.
The example below runs two Spark SQL queries against a Unity Catalog table — one for row count and one for null rate — and raises an error if either threshold is breached. The downstream customer_summary_report asset only materializes when validated_customer_data succeeds.
from pyspark.sql import SparkSession
import dagster as dg
@dg.asset(
group_name="customer_data_pipeline",
description="Raw customer records in Unity Catalog from upstream ingestion.",
kinds={"databricks"},
)
def raw_customer_table(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
context.log.info("Raw customer table available at catalog.schema.customers_raw")
return dg.MaterializeResult(
metadata={
"table": dg.MetadataValue.text("catalog.schema.customers_raw"),
}
)
@dg.asset(
group_name="customer_data_pipeline",
description=(
"Validates row counts and null rates on raw customer data using Spark on Databricks. "
"Blocks downstream assets if data quality thresholds are not met."
),
kinds={"databricks", "spark"},
deps=[raw_customer_table],
)
def validated_customer_data(
context: dg.AssetExecutionContext,
spark: dg.ResourceParam[SparkSession],
) -> dg.MaterializeResult:
df = spark.sql(
"SELECT COUNT(*) AS cnt, "
"SUM(CASE WHEN email IS NULL THEN 1 ELSE 0 END) AS nulls "
"FROM catalog.schema.customers_raw"
)
row = df.collect()[0]
row_count = int(row["cnt"])
null_rate = float(row["nulls"]) / row_count if row_count > 0 else 0.0
if row_count < 1_000:
raise ValueError(f"Row count {row_count:,} is below minimum threshold of 1,000")
if null_rate > 0.05:
raise ValueError(f"Null rate {null_rate:.1%} exceeds maximum of 5%")
context.log.info(
f"Validation passed: {row_count:,} rows, {null_rate:.1%} null rate"
)
return dg.MaterializeResult(
metadata={
"row_count": dg.MetadataValue.int(row_count),
"null_rate": dg.MetadataValue.float(null_rate),
"validation": dg.MetadataValue.text("passed"),
}
)
@dg.asset(
group_name="customer_data_pipeline",
description="Customer summary report — only runs after validation passes.",
deps=[validated_customer_data],
)
def customer_summary_report(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
context.log.info("Generating customer summary report from validated data.")
return dg.MaterializeResult(
metadata={
"report": dg.MetadataValue.text("customer_summary_latest"),
"status": dg.MetadataValue.text("generated"),
}
)
The spark resource is the DatabricksSession configured in Step 2. Validation failures surface as asset failures in the Dagster UI, with the error message and Spark query results visible in the event log.