Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DS][x/n] ScheduledSince condition instead of ScheduledSince delta #21953

Conversation

OwenKephart
Copy link
Contributor

@OwenKephart OwenKephart commented May 17, 2024

Summary & Motivation

I did a lot of internal back-and-forth on this, and what I landed on was that we very well may want to add a "ScheduledSinceDelta" at some point in the future, but that we should wait for concrete use cases before adding it (and at that point in time, we'll probably want a "ScheduledSinceCron" as well). At that point in time, we may think about how to best factor that implementation to make it more similar to the current one vs. the implementation I just removed (which was significantly more complicated).

As implemented, this condition (mostly) restores the current-world AMP eager behavior, which only "tries" a single time to fill in a missing partition or to respond to a parent update. This eliminates the need to add an additional check for failed runs (with a "cooldown"), as we get to set the "parent updated or missing" clause to False as soon as we request a run, rather than having to wait until it actually does become False (which almost certainly won't happen by next tick, and might not happen ever).

This also means we could remove the "in progress" check, which was added for similar reasons (i.e. once you request an asset because it was missing, it probably won't be filled in by the next tick , so you wait until the in progress run finished), but I think that is actually a reasonably useful check regardless (we've seen a fair number of complaints about the existing eager behavior kicking off multiple concurrent runs, so we can just filter those out by default, and if someone wants a different behavior they can remove that condition).

The eventual goal is to have this become the backbone of a sensor-analog replacement. i.e. you will be able to do something like the following:

class NewFileOnS3(SchedulingCondition):

    def evaluate(self, context: SchedulingContext) -> SchedulingResult:
        if self.new_file_since_previous_tick(context):
            true_slice = context.candidate_slice
         else:
            true_slice = context.create_empty_slice()
         return SchedulingResult.create(true_slice)

my_schedule_selection = AssetSelection.something()

# materialize if you haven't been requested since the the new file
# landed on s3
my_scheduley_condition = ~SchedulingCondition.requested_since(NewFileOnS3())

defs = Definitions(...).with_asset_specs_mapped(
    lambda spec: spec.with_attributes(
        scheduling=my_scheduly_condition | spec.scheduling
    ),
    asset_selection=my_schedule_selection,
)

TBD on exactly how much of that we abstract away. This also relies on the fact that we will be emitting Executions from within DS, as this means we can reliably kick off a run that targets all assets with this policy on the tick that NewFileOnS3 becomes true (so we don't have do to any complicated listening for parent updates at the partition boundaries)

How I Tested These Changes

Copy link
Contributor Author

OwenKephart commented May 17, 2024

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

Join @OwenKephart and the rest of your teammates on Graphite Graphite

@OwenKephart OwenKephart force-pushed the 05-14-Allow_conditions_which_target_dependencies_to_target_specific_dependencies branch from d06ca84 to 1bc7987 Compare May 17, 2024 21:49
@OwenKephart OwenKephart force-pushed the 05-17-ScheduledSince_condition_instead_of_ScheduledSince_delta branch from 1ef4c1c to 225f872 Compare May 17, 2024 21:49
@OwenKephart OwenKephart changed the title [DS][x/n] ScheduledSince condition instead of ScheduledSince delta [DS][45/n] ScheduledSince condition instead of ScheduledSince delta May 17, 2024
@OwenKephart OwenKephart requested a review from schrockn May 17, 2024 22:07
@OwenKephart OwenKephart changed the title [DS][45/n] ScheduledSince condition instead of ScheduledSince delta [DS][x/n] ScheduledSince condition instead of ScheduledSince delta May 20, 2024
@OwenKephart OwenKephart marked this pull request as draft May 20, 2024 22:49
@OwenKephart OwenKephart force-pushed the 05-14-Allow_conditions_which_target_dependencies_to_target_specific_dependencies branch from 1bc7987 to abdf548 Compare May 20, 2024 23:21
@OwenKephart OwenKephart force-pushed the 05-17-ScheduledSince_condition_instead_of_ScheduledSince_delta branch from 225f872 to 2e0af0b Compare May 20, 2024 23:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant