Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions planemo/galaxy/invocations/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@


class InvocationStep(TypedDict, total=False):
id: str
state: Optional[str]
subworkflow_invocation_id: Optional[str]

Expand Down
6 changes: 4 additions & 2 deletions planemo/galaxy/invocations/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,10 @@ def an_incomplete_subworkflow_id(self):
return random.choice(tuple(self.subworkflow_invocation_ids_seen - self.subworkflow_invocation_ids_completed))

def all_subworkflows_complete(self):
if self.new_steps:
# These don't have subworkflow invocation ids yet, we can't know if they're all complete
if self.new_steps and not self.workflow_progress.invocation_scheduling_terminal:
# new steps may still become subworkflows, so we can't tell if all are complete yet.
# Once scheduling is terminal _poll_main_workflow stops refreshing the invocation, so
# new_steps is frozen - blocking on it then would never return.
return False
return len(self.subworkflow_invocation_ids_seen) == len(self.subworkflow_invocation_ids_completed)

Expand Down
2 changes: 2 additions & 0 deletions planemo/galaxy/invocations/simulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(
self, jobs: List[Job], invocation: Optional["Invocation"], after: int, states: List[StateWithDuration]
):
super().__init__(after, states)
self.id = "step_id"
self.jobs = jobs
self.invocation = invocation

Expand Down Expand Up @@ -135,6 +136,7 @@ def get_api_invocation(self) -> InvocationResponse:
steps: List[InvocationStepResponse] = []
for step in self.active_steps:
api_step: InvocationStepResponse = {
"id": step.id,
"state": step.state,
}
if step.invocation:
Expand Down
22 changes: 22 additions & 0 deletions tests/test_invocation_polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,26 @@
SCENARIO_MULTIPLE_OK_SUBWORKFLOWS,
SCENARIO_NESTED_SUBWORKFLOWS,
SCENARIO_SUBWORKFLOW_WITH_FAILED_JOBS,
SCENARIO_TERMINAL_WITH_NEW_STEP,
)

SLEEP = 0


class MockPollingTracker(PollingTracker):
# The poll loop has no timeout, so cap ticks to fail fast instead of hanging the suite
# if polling ever stops terminating.
MAX_TICKS = 1000

def __init__(self, simulation: Invocation):
self._simulation = simulation
self._ticks = 0

def sleep(self) -> None:
self._ticks += 1
assert (
self._ticks <= self.MAX_TICKS
), f"invocation poll loop did not terminate after {self.MAX_TICKS} ticks - likely deadlocked"
self._simulation.tick()
if SLEEP > 0:
sleep(SLEEP)
Expand Down Expand Up @@ -159,6 +169,18 @@ def test_fail_fast_enabled_with_subworkflow_job_failure():
assert "Failed to run workflow, at least one job is in [error] state." in error_message


def test_polling_terminates_with_new_step_and_paused_job():
"""Polling must terminate when a step stays 'new' behind a paused/errored branch.

Without the fix this never returns and trips MockPollingTracker.MAX_TICKS.
"""
final_invocation_state, job_state, error_message = run_workflow_simulation(
SCENARIO_TERMINAL_WITH_NEW_STEP, fail_fast=False
)
assert final_invocation_state == "ready"
assert job_state == "error"


def run_workflow_simulation(
yaml_str: str, display_configuration: Optional[DisplayConfiguration] = None, fail_fast: bool = False
):
Expand Down
17 changes: 17 additions & 0 deletions tests/test_workflow_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,23 @@
"""


# Regression for the hang in galaxyproject/iwc job 77835961979: an errored step pauses a
# downstream step while a third step is still "new". With a paused + errored job and "ready"
# state, the invocation counts as terminal, so polling stops refreshing it and new_steps stays
# non-empty. all_subworkflows_complete() then used to block on it forever.
SCENARIO_TERMINAL_WITH_NEW_STEP = """
states: [new, ready, scheduled]
steps:
- state: scheduled
jobs:
- states: [new, error]
- state: scheduled
jobs:
- states: [new, paused]
- state: new
"""


def test_parse_scenario_1_invocation_state_evolution():
invocation = parse_workflow_simulation_from_string(SCENARIO_1)
invocation_dict = invocation.get_api_invocation()
Expand Down