Customizing on_cron
You can scaffold assets from the command line by running dg scaffold defs dagster.asset <path/to/asset_file.py>. For more information, see the dg CLI docs.
When to use on_cron vs. a job-based schedule
AutomationCondition.on_cron() is the recommended starting point for most asset orchestration on Dagster 1.7 and later. Reach for on_cron when you want:
- A schedule defined alongside a single asset or a small group of assets.
- An asset-centric workflow that benefits from dependency tracking, freshness, and the rest of declarative automation.
- Independent parallel materializations across the targeted assets, with no implicit ordering between them.
Reach for a job-based schedule instead when you need:
- Complex orchestration logic across ops, multiple assets, and external steps.
- Conditional execution flows or shared state between steps.
- Mixed asset and op workflows that don't map cleanly to per-asset conditions.
When you use on_cron against multiple assets, each asset materializes independently in its own run; there is no queue or ordering between them. If you need to coordinate them as a single unit, use a job. For monitoring patterns that complement on_cron, see Asset sensors, asset checks, and Op hooks.
Ignoring dependencies
By default, AutomationCondition.on_cron() will wait for all upstream dependencies to be updated before executing the asset it's attached to. In some cases, it can be useful to ignore some upstream dependencies in this calculation. This can be done by passing in an AssetSelection to be ignored:
import dagster as dg
condition = dg.AutomationCondition.on_cron("@hourly").ignore(
dg.AssetSelection.assets("foo")
)
Alternatively, you can pass in an AssetSelection to be allowed:
import dagster as dg
condition = dg.AutomationCondition.on_cron("@hourly").allow(
dg.AssetSelection.groups("abc")
)
Waiting for all blocking asset checks to complete before executing
The AutomationCondition.all_deps_blocking_checks_passed() condition becomes true after all upstream blocking checks have passed.
This can be combined with AutomationCondition.on_cron() to ensure that your asset does not execute if upstream data is failing data quality checks:
import dagster as dg
condition = (
dg.AutomationCondition.on_cron("@hourly")
& dg.AutomationCondition.all_deps_blocking_checks_passed()
)
Executing later than upstream assets
By default, a single cron schedule determines the point in time that an asset starts looking for upstream data, as well as the earliest point that it would be valid to execute that asset. Sometimes, it can be useful to start looking for upstream updates at an earlier time than the cron schedule on which you want the asset to execute.
This can be achieved by modifying the AutomationCondition.all_deps_updated_since_cron() sub-condition. In this example, we want our asset to materialize at 9:00 AM each day, but start looking for upstream data as soon as the midnight boundary is passed:
import dagster as dg
NINE_AM_CRON = "0 9 * * *"
condition = dg.AutomationCondition.on_cron(NINE_AM_CRON).replace(
old=dg.AutomationCondition.all_deps_updated_since_cron(NINE_AM_CRON),
new=dg.AutomationCondition.all_deps_updated_since_cron("0 0 * * *"),
)
Resolving dependencies through virtual assets (views)
This feature is considered in a preview stage, and is under active development, and not considered ready for production use. You may encounter feature gaps, and the APIs may change. For more information, see the API lifecycle stages documentation.
By default, AutomationCondition.on_cron() evaluates dependencies against an asset's direct parents. When some of those parents are virtual assets such as database views, you may want the condition to look through them to the nearest non-virtual ancestors instead. For more information, see virtual assets.
The .resolve_through_virtual() modifier causes all dependency-related sub-conditions (such as all_deps_updated_since_cron()) to resolve through virtual assets. This means the condition will wait for all non-virtual ancestors to be updated, skipping over any virtual assets in the graph:
import dagster as dg
condition = dg.AutomationCondition.on_cron("@hourly").resolve_through_virtual()
Updating older time partitions
By default, AutomationCondition.on_cron() will target the latest time partition of an asset.
If you instead want to update partitions on a delay, then you can replace this condition with one that targets a partition that has a specific lag from the latest time window:
from datetime import timedelta
import dagster as dg
five_days_ago_condition = dg.AutomationCondition.in_latest_time_window(
timedelta(days=5)
) & ~dg.AutomationCondition.in_latest_time_window(timedelta(days=4))
condition = dg.AutomationCondition.on_cron("@daily").replace(
"in_latest_time_window", five_days_ago_condition
)