-
Notifications
You must be signed in to change notification settings - Fork 337
[flyte deck] Streaming Decks #2779
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 55 commits
01182b4
9616fc3
98ae7c7
4e92bb0
4df18b5
ebb4d4e
99522d9
c19d67d
67cd829
06da3df
6d99d69
b805cd7
4c97758
7b3574a
9b60564
18c994f
39f39d1
aabcbbb
9ca43f3
ed56352
b559fc9
fc5578f
7139468
e0aee9e
3727588
ce3ee15
d066231
f9387ce
f14c3fa
c33a909
8666c60
93580d6
bcaaabd
a321700
6464fae
884943c
d4b5b96
406227c
1e77f54
d70a2d5
7fc6393
6980140
c87a342
f609760
473ae11
008fe52
f0b9028
d48efa9
6b55930
b5912fb
a681ccd
048fdff
7bcf15e
b6c41c3
be02f9f
cf83e06
e137328
a59a56e
b8383be
dc6d203
41d8760
b71cc19
b5976fe
0c1a5a3
d082456
4a8c68f
b58527b
2764ed4
90372db
d8c408c
5cacf11
c447793
c5cc967
1d6417a
b0cd1ae
974b882
ea8b6e0
a642c9d
e9aef35
04775d7
34a4146
f30382b
b649f8f
0565282
c0fd23a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -129,6 +129,7 @@ class TaskMetadata(object): | |||||||||||||
| timeout (Optional[Union[datetime.timedelta, int]]): the max amount of time for which one execution of this task | ||||||||||||||
| should be executed for. The execution will be terminated if the runtime exceeds the given timeout | ||||||||||||||
| (approximately) | ||||||||||||||
| :param bool generates_deck: Whether the task will generate a Deck URI. | ||||||||||||||
| pod_template_name (Optional[str]): the name of existing PodTemplate resource in the cluster which will be used in this task. | ||||||||||||||
| """ | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -141,6 +142,7 @@ class TaskMetadata(object): | |||||||||||||
| retries: int = 0 | ||||||||||||||
| timeout: Optional[Union[datetime.timedelta, int]] = None | ||||||||||||||
| pod_template_name: Optional[str] = None | ||||||||||||||
| generates_deck: bool = False | ||||||||||||||
| is_eager: bool = False | ||||||||||||||
|
|
||||||||||||||
| def __post_init__(self): | ||||||||||||||
|
|
@@ -179,6 +181,7 @@ def to_taskmetadata_model(self) -> _task_model.TaskMetadata: | |||||||||||||
| discovery_version=self.cache_version, | ||||||||||||||
| deprecated_error_message=self.deprecated, | ||||||||||||||
| cache_serializable=self.cache_serialize, | ||||||||||||||
| generates_deck=self.generates_deck, | ||||||||||||||
|
eapolinario marked this conversation as resolved.
|
||||||||||||||
| pod_template_name=self.pod_template_name, | ||||||||||||||
| cache_ignore_input_vars=self.cache_ignore_input_vars, | ||||||||||||||
| is_eager=self.is_eager, | ||||||||||||||
|
|
@@ -720,7 +723,9 @@ def dispatch_execute( | |||||||||||||
| may be none | ||||||||||||||
| * ``DynamicJobSpec`` is returned when a dynamic workflow is executed | ||||||||||||||
| """ | ||||||||||||||
| if DeckField.TIMELINE.value in self.deck_fields and ctx.user_space_params is not None: | ||||||||||||||
| if not self.disable_deck: | ||||||||||||||
| ctx.user_space_params._enable_deck = True # type: ignore | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok I'll move it to the builder, thank you |
||||||||||||||
| if DeckField.TIMELINE.value in self.deck_fields and ctx.user_space_params is not None and not self.disable_deck: | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider combining deck disable checks
Consider combining the two conditional checks into a single condition to avoid redundant Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #24744d Is this a valid issue, or was it incorrectly flagged by the Agent?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah we can do this, small change. |
||||||||||||||
| ctx.user_space_params.decks.append(ctx.user_space_params.timeline_deck) | ||||||||||||||
| # Invoked before the task is executed | ||||||||||||||
| new_user_params = self.pre_execute(ctx.user_space_params) | ||||||||||||||
|
|
||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -86,6 +86,16 @@ def name(self) -> str: | |||||||||||||
| def html(self) -> str: | ||||||||||||||
| return self._html | ||||||||||||||
|
|
||||||||||||||
| @classmethod | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. technically this can be static right?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes static is better I think
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since static method is bound to a class rather than the objects for that class. reference: https://www.digitalocean.com/community/tutorials/python-static-method |
||||||||||||||
| def publish(cls): | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deck.publish, pulls the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes we can do this, this can make the code more efficient
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now it changed to warning, and move the logic to |
||||||||||||||
| params = FlyteContextManager.current_context().user_space_params | ||||||||||||||
| if params.enable_deck: | ||||||||||||||
| task_name = params.task_id.name | ||||||||||||||
| _output_deck(task_name=task_name, new_user_params=params) | ||||||||||||||
| else: | ||||||||||||||
| # todo: change to a more proper error | ||||||||||||||
| raise ValueError("Deck is disabled for this task, please don't call Deck.publish()") | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider using more specific exception type
Consider using a more specific exception type like Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #24744d Is this a valid issue, or was it incorrectly flagged by the Agent?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we just log a warning instead of raising an error? the warning can say that the call to publish is being ignored. that way the task can continue to function.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also let's remove the warning on line 46: "This feature is in beta."
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok I'm going to call |
||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| class TimeLineDeck(Deck): | ||||||||||||||
| """ | ||||||||||||||
|
|
@@ -148,7 +158,8 @@ def generate_time_table(data: dict) -> str: | |||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| def _get_deck( | ||||||||||||||
| new_user_params: ExecutionParameters, ignore_jupyter: bool = False | ||||||||||||||
| new_user_params: ExecutionParameters, | ||||||||||||||
| ignore_jupyter: bool = False, | ||||||||||||||
| ) -> typing.Union[str, "IPython.core.display.HTML"]: # type:ignore | ||||||||||||||
| """ | ||||||||||||||
| Get flyte deck html string | ||||||||||||||
|
|
@@ -180,7 +191,7 @@ def _output_deck(task_name: str, new_user_params: ExecutionParameters): | |||||||||||||
| local_path = f"{local_dir}{os.sep}{DECK_FILE_NAME}" | ||||||||||||||
| try: | ||||||||||||||
| with open(local_path, "w", encoding="utf-8") as f: | ||||||||||||||
| f.write(_get_deck(new_user_params, ignore_jupyter=True)) | ||||||||||||||
| f.write(_get_deck(new_user_params=new_user_params, ignore_jupyter=True)) | ||||||||||||||
| logger.info(f"{task_name} task creates flyte deck html to file://{local_path}") | ||||||||||||||
| if ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION: | ||||||||||||||
| fs = ctx.file_access.get_filesystem_for_path(new_user_params.output_metadata_prefix) | ||||||||||||||
|
|
@@ -197,6 +208,7 @@ def _output_deck(task_name: str, new_user_params: ExecutionParameters): | |||||||||||||
| def get_deck_template() -> Template: | ||||||||||||||
| root = os.path.dirname(os.path.abspath(__file__)) | ||||||||||||||
| templates_dir = os.path.join(root, "html", "template.html") | ||||||||||||||
|
|
||||||||||||||
| with open(templates_dir, "r") as f: | ||||||||||||||
| template_content = f.read() | ||||||||||||||
| return Template(template_content) | ||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -180,6 +180,7 @@ def __init__( | |||||
| pod_template_name, | ||||||
| cache_ignore_input_vars, | ||||||
| is_eager: bool = False, | ||||||
| generates_deck: bool = False, | ||||||
| ): | ||||||
| """ | ||||||
| Information needed at runtime to determine behavior such as whether or not outputs are discoverable, timeouts, | ||||||
|
|
@@ -199,6 +200,7 @@ def __init__( | |||||
| receive deprecation warnings. | ||||||
| :param bool cache_serializable: Whether or not caching operations are executed in serial. This means only a | ||||||
| single instance over identical inputs is executed, other concurrent executions wait for the cached results. | ||||||
| :param bool generates_deck: Whether the task will generate a Deck URI. | ||||||
|
pingsutw marked this conversation as resolved.
|
||||||
| :param pod_template_name: The name of the existing PodTemplate resource which will be used in this task. | ||||||
| :param cache_ignore_input_vars: Input variables that should not be included when calculating hash for cache. | ||||||
| :param is_eager: | ||||||
|
|
@@ -214,6 +216,7 @@ def __init__( | |||||
| self._pod_template_name = pod_template_name | ||||||
| self._cache_ignore_input_vars = cache_ignore_input_vars | ||||||
| self._is_eager = is_eager | ||||||
| self._generates_deck = generates_deck | ||||||
|
|
||||||
| @property | ||||||
| def is_eager(self): | ||||||
|
|
@@ -295,6 +298,14 @@ def pod_template_name(self): | |||||
| """ | ||||||
| return self._pod_template_name | ||||||
|
|
||||||
| @property | ||||||
| def generates_deck(self) -> bool: | ||||||
| """ | ||||||
| Whether the task will generate a Deck URI. | ||||||
|
pingsutw marked this conversation as resolved.
Outdated
|
||||||
| :rtype: bool | ||||||
| """ | ||||||
| return self._generates_deck | ||||||
|
|
||||||
| @property | ||||||
| def cache_ignore_input_vars(self): | ||||||
| """ | ||||||
|
|
@@ -315,6 +326,7 @@ def to_flyte_idl(self): | |||||
| discovery_version=self.discovery_version, | ||||||
| deprecated_error_message=self.deprecated_error_message, | ||||||
| cache_serializable=self.cache_serializable, | ||||||
| generates_deck=self.generates_deck, | ||||||
| pod_template_name=self.pod_template_name, | ||||||
| cache_ignore_input_vars=self.cache_ignore_input_vars, | ||||||
| is_eager=self.is_eager, | ||||||
|
|
@@ -338,6 +350,7 @@ def from_flyte_idl(cls, pb2_object: _core_task.TaskMetadata): | |||||
| discovery_version=pb2_object.discovery_version, | ||||||
| deprecated_error_message=pb2_object.deprecated_error_message, | ||||||
| cache_serializable=pb2_object.cache_serializable, | ||||||
| generates_deck=pb2_object.generates_deck, | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider validating generates_deck field existence
Consider validating the Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #077cc1 Is this a valid issue, or was it incorrectly flagged by the Agent?
|
||||||
| pod_template_name=pb2_object.pod_template_name, | ||||||
| cache_ignore_input_vars=pb2_object.cache_ignore_input_vars, | ||||||
| is_eager=pb2_object.is_eager, | ||||||
|
|
||||||

Uh oh!
There was an error while loading. Please reload this page.