Migrating an Airflow PythonOperator to Dagster
In this page, we'll explain migrating an Airflow PythonOperator
to Dagster.
About the Airflow PythonOperator
In Airflow, the PythonOperator
runs arbitrary Python functions. For example, you might have a task that runs a function write_to_db
, which combs a directory for files, and writes each one to a db table.
# type: ignore
from pathlib import Path
from typing import Any
RAW_DATA_DIR = Path("path")
def contents_as_df(path: Path) -> Any:
pass
def upload_to_db(df: Any):
pass
# start_op
from airflow.operators.python import PythonOperator
def write_to_db() -> None:
for raw_file in RAW_DATA_DIR.iterdir():
df = contents_as_df(raw_file)
upload_to_db(df)
PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...)
Dagster equivalent
The Dagster equivalent is instead to construct a asset
or multi_asset
-decorated function, which materializes assets corresponding to what your python function is doing.
from pathlib import Path
from typing import Any
RAW_DATA_DIR = Path("path")
TABLE_URI = "blah"
def contents_as_df(path: Path) -> Any:
pass
def upload_to_db(df):
pass
# start_asset
from dagster import asset
@asset(key=TABLE_URI)
def write_to_db() -> None:
for raw_file in RAW_DATA_DIR.iterdir():
df = contents_as_df(raw_file)
upload_to_db(df)
Migrating the operator
Migrating the operator breaks down into a few steps:
- Make a shared library available to both Airflow and Dagster with your python function.
- Writing an
@asset
-decorated function which runs the python function shared between both modules. - Using
dagster-airlift
to proxy execution of the original task to Dagster.
Step 1: Building a shared library
We recommend a monorepo setup for migration; this allows you to keep all your code in one place and easily share code between Airflow and Dagster, without complex CI/CD coordination.
First, we recommend factoring out a shared package to be available to both the Dagster runtime and the Airflow runtime which contains your python function. The process is as follows:
- Scaffold out a new python project which will contain your shared infrastructure.
- Ensure that the shared library is available to both your Airflow and Dagster deployments. This can be done by adding an editable requirement to your
setup.py
orpyproject.toml
file in your Airflow/Dagster package. - Include the python dependencies relevant to your particular function in your new package. Write your python function in the shared package, and change your Airflow code to import the function from the shared library.
To illustrate what this might look like a bit more; let's say you originally have this project structure in Airflow:
airflow_repo/
├── airflow_package/
│ └── dags/
│ └── my_dag.py # Contains your Python function
With dag code that looks this:
# type: ignore
from pathlib import Path
from typing import Any
RAW_DATA_DIR = Path("path")
def contents_as_df(path: Path) -> Any:
pass
def upload_to_db(df: Any):
pass
# start_op
from airflow.operators.python import PythonOperator
def write_to_db() -> None:
for raw_file in RAW_DATA_DIR.iterdir():
df = contents_as_df(raw_file)
upload_to_db(df)
PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...)
You might create a new top-level package to contain the shared code:
airflow_repo/
├── airflow_package/
│ └── dags/