Skip to main content

Defining assets that depend on other assets

Asset definitions can depend on other asset definitions. The dependent asset is called the downstream asset, and the asset it depends on is the upstream asset.

tip

You can scaffold assets from the command line by running dg scaffold defs dagster.asset <path/to/asset_file.py>. For more information, see the dg CLI docs.

Defining basic dependencies

You can define a dependency between two assets by passing the upstream asset to the deps parameter in the downstream asset's @asset decorator.

In this example, the asset sugary_cereals creates a new table (sugary_cereals) by selecting records from the cereals table. Then the asset shopping_list creates a new table (shopping_list) by selecting records from sugary_cereals:

src/<project_name>/defs/assets.py
import dagster as dg


@dg.asset
def sugary_cereals() -> None:
execute_query(
"CREATE TABLE sugary_cereals AS SELECT * FROM cereals WHERE sugar_grams > 10"
)


@dg.asset(deps=[sugary_cereals])
def shopping_list() -> None:
execute_query("CREATE TABLE shopping_list AS SELECT * FROM sugary_cereals")

Defining asset dependencies across code locations

info

Assets in different code locations cannot be materialized in the same run. To trigger a downstream asset in another code location after an upstream asset materializes, use a sensor or declarative automation.

Assets can depend on assets in different code locations.

note

Declaring a dependency with deps only tracks lineage across code locations. To pass the upstream asset's data to the downstream asset as an input, see Using data from another code location as an input.

In the following example, the code_location_1_asset asset produces a JSON string from a file in code_location_1:

src/<project_name>/defs/assets.py
import json

import dagster as dg


@dg.asset
def code_location_1_asset():
with open("/tmp/data/code_location_1_asset.json", "w+") as f:
json.dump(5, f)

In code_location_2, we can reference code_location_1_asset it via its asset key:

src/<project_name>/defs/assets.py
import json

import dagster as dg


@dg.asset(deps=["code_location_1_asset"])
def code_location_2_asset():
with open("/tmp/data/code_location_1_asset.json") as f:
x = json.load(f)

with open("/tmp/data/code_location_2_asset.json", "w+") as f:
json.dump(x + 6, f)

Using data from another code location as an input

Declaring a dependency with deps tracks lineage across code locations, but does not pass the upstream asset's data to the downstream asset as an input.

To use data from an asset in another code location as an input to a downstream asset, declare the upstream asset as an AssetSpec in your code location with an I/O manager key. Include the AssetSpec in your Definitions object alongside your downstream asset.

note

SourceAsset was the previous way to do this and is now deprecated. Use AssetSpec(...).with_io_manager_key(...) instead.

In the example below, daily_sales_data is defined in another code location and used as an input to enriched_sales_data:

src/<project_name>/defs/assets.py
@dg.io_manager
def warehouse_io_manager():
class WarehouseIOManager(dg.IOManager):
def handle_output(self, context, obj):
pass

def load_input(self, context):
# load data from storage
return [1, 2, 3]

return WarehouseIOManager()


# Declare the upstream asset from code location 1 as an AssetSpec.
# This tells Dagster to load it via warehouse_io_manager at runtime.
daily_sales_data = dg.AssetSpec(key="daily_sales_data").with_io_manager_key(
"warehouse_io_manager"
)


@dg.asset
def enriched_sales_data(daily_sales_data: list) -> list:
# daily_sales_data is loaded from storage via warehouse_io_manager
return [x * 2 for x in daily_sales_data]


# Include the AssetSpec alongside your assets in Definitions
defs = dg.Definitions(
assets=[daily_sales_data, enriched_sales_data],
resources={"warehouse_io_manager": warehouse_io_manager},
)

Cross-code-location dependencies on partitioned assets work the same way. For more information, see Partitioning assets.