Preventing runs when reactivating a sensor
When you reactivate an automation condition sensor that has been off for an extended period, the default AutomationCondition.eager condition triggers execution for every partition that became eligible while the sensor was off. For high-cardinality or expensive workloads (GPU jobs, large batch transforms), this can produce thousands of unwanted runs the moment the sensor turns back on.
This guide shows how to write a time-aware automation condition that only triggers for newly updated dependencies going forward, ignoring the gap during which the sensor was inactive.
Preview before reactivating
Before reactivating, click into the target section of your automation condition sensor in the Dagster UI to preview which assets and partitions would be triggered. This is the cheapest way to confirm whether the default condition would cause a flood of runs.
Time-aware automation condition
Replace the default condition with one that gates execution on a cron tick:
automation_condition = (
dg.AutomationCondition.any_deps_match(
dg.AutomationCondition.newly_updated().since(
dg.AutomationCondition.cron_tick_passed("0 0 * * *")
)
& ~dg.AutomationCondition.executed_with_root_target()
).newly_true()
& ~dg.AutomationCondition.in_progress()
& dg.AutomationCondition.in_latest_time_window()
)
This condition triggers only when a dependency was updated since the most recent cron tick, rather than at any point during the sensor's downtime.
Roll the change out incrementally:
- Apply the condition to a small subset of assets first.
- Observe behavior for a day or two, watching for unexpected materializations or skips.
- Apply to the remaining downstream assets once you're confident.
Simple conditions like AutomationCondition.any_deps_updated() & ~AutomationCondition.initial_evaluation() do not account for time gaps. They will still trigger for partitions added during the inactive period.
Filtering by date range
For more complex scenarios, define a custom AutomationCondition subclass that filters partitions based on a fixed date range:
class RecentPartitionsOnlyCondition(dg.AutomationCondition):
"""Only allow materialization of partitions whose key is on or after CUTOFF_DATETIME."""
def evaluate(self, context: dg.AutomationContext) -> dg.AutomationResult:
if not context.partitions_def:
return dg.AutomationResult(context, context.candidate_subset)
all_partitions = context.candidate_subset.expensively_compute_partition_keys()
recent_partitions = {
pk for pk in all_partitions if datetime.fromisoformat(pk) >= CUTOFF_DATETIME
}
true_subset = context.candidate_subset.compute_intersection_with_partition_keys(
recent_partitions
)
return dg.AutomationResult(context, true_subset=true_subset)
Use the custom condition in place of the time-aware example above when the cron-tick approach is too coarse for your use case.
Prevention
When you know a sensor will be deactivated for an extended period, design the condition with a time-aware bound from the start. That way, reactivation does not produce a backlog burst regardless of how long the sensor was off.