From fcb07265169cff9e2b8cdedb21d022a37eeed2aa Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Mon, 30 Mar 2026 14:51:05 +0800 Subject: [PATCH 01/17] add option to ray cluster Signed-off-by: Yuteng Chen --- plugins/flytekit-ray/flytekitplugins/ray/task.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index 9793e9d5d9..5ecd652449 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -56,12 +56,20 @@ def __post_init__(self): if self.requests and self.limits: raise ValueError("Cannot specify both pod_template and requests/limits") +@dataclass +class AutoscalerOptions: + upscaling_mode: Optional[str] = None + idle_timeout_seconds: Optional[int] = None + env: Optional[Dict[str, str]] = None + image: Optional[str] = None + resources: Optional[Resources] = None @dataclass class RayJobConfig: worker_node_config: typing.List[WorkerNodeConfig] head_node_config: typing.Optional[HeadNodeConfig] = None enable_autoscaling: bool = False + autoscaler_options: Optional[AutoscalerOptions] = None runtime_env: typing.Optional[dict] = None address: typing.Optional[str] = None shutdown_after_job_finishes: bool = False From 9cfafc129d9c6a5a51a715f59e30b266cba49d93 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Tue, 31 Mar 2026 10:33:06 +0800 Subject: [PATCH 02/17] convert options from user to grpc options Signed-off-by: Yuteng Chen --- .../flytekitplugins/ray/models.py | 61 +++++++++++++++++++ .../flytekit-ray/flytekitplugins/ray/task.py | 19 +++++- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 1f3a830f16..47236c82f8 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -146,6 +146,59 @@ def from_flyte_idl(cls, proto): ) +class AutoscalerOptions(_common.FlyteIdlEntity): + def __init__( + self, + upscaling_mode: Optional[str] = None, + idle_timeout_seconds: Optional[int] = None, + image: Optional[str] = None, + env: Optional[Dict[str, str]] = None, + requests: Optional[Resources] = None, + limits: Optional[Resources] =None + ): + self._upscaling_mode = upscaling_mode + self._idle_timeout_seconds = idle_timeout_seconds + self._image = image + self._env = env + self._requests = requests + self._limits = limits + + @property + def upscaling_mode(self) -> Optional[str]: + return self._upscaling_mode + + @property + def idle_timeout_seconds(self) -> Optional[int]: + return self._idle_timeout_seconds + + @property + def image(self) -> Optional[str]: + return self._image + + @property + def env(self) -> Optional[Dict[str, str]]: + return self._env + + def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: + envs List[_ray_pb2.Envvar] = [] + for key, val in self.env.items(): + envs.append(_ray_pb2.Envvar(name=key, value=val)) + return _ray_pb2.AutoscalerOptions( + upscaling_mode = self.upscaling_mode, + idle_timeout_seconds = self.idle_timeout_seconds, + image = self.image, + env = envs if len(envs) > 0 else None, + ) + + @classmethod + def from_flyte_idl(cls, proto): + return cls( + upscaling_mode = proto.upscaling_mode, + idle_timeout_seconds = proto.idle_timeout_seconds, + image = proto.image, + ) + + class RayCluster(_common.FlyteIdlEntity): """ Define RayCluster spec that will be used by KubeRay to launch the cluster. @@ -156,10 +209,12 @@ def __init__( worker_group_spec: typing.List[WorkerGroupSpec], head_group_spec: typing.Optional[HeadGroupSpec] = None, enable_autoscaling: bool = False, + autoscaler_options: typing.Optional[AutoscalerOptions] = None ): self._head_group_spec = head_group_spec self._worker_group_spec = worker_group_spec self._enable_autoscaling = enable_autoscaling + self._autoscaler_options = autoscaler_options @property def head_group_spec(self) -> HeadGroupSpec: @@ -184,6 +239,10 @@ def enable_autoscaling(self) -> bool: :rtype: bool """ return self._enable_autoscaling + + @property + def autoscaler_options(self) -> typing.Optional[AutoscalerOptions]: + return self._autoscaler_options def to_flyte_idl(self) -> _ray_pb2.RayCluster: """ @@ -193,6 +252,7 @@ def to_flyte_idl(self) -> _ray_pb2.RayCluster: head_group_spec=self.head_group_spec.to_flyte_idl() if self.head_group_spec else None, worker_group_spec=[wg.to_flyte_idl() for wg in self.worker_group_spec], enable_autoscaling=self.enable_autoscaling, + autoscaler_options=self.autoscaler_options.to_flyte_idl(), ) @classmethod @@ -205,6 +265,7 @@ def from_flyte_idl(cls, proto): head_group_spec=HeadGroupSpec.from_flyte_idl(proto.head_group_spec) if proto.head_group_spec else None, worker_group_spec=[WorkerGroupSpec.from_flyte_idl(wg) for wg in proto.worker_group_spec], enable_autoscaling=proto.enable_autoscaling, + autoscaler_options=AutoscalerOptions.from_flyte_idl(proto.autoscaler_options), ) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index 5ecd652449..fb79bd5d43 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -57,19 +57,20 @@ def __post_init__(self): raise ValueError("Cannot specify both pod_template and requests/limits") @dataclass -class AutoscalerOptions: +class AutoscalerOptionsConfig: upscaling_mode: Optional[str] = None idle_timeout_seconds: Optional[int] = None env: Optional[Dict[str, str]] = None image: Optional[str] = None - resources: Optional[Resources] = None + requests: Optional[Resources] = None + limits: Optional[Resources] = None @dataclass class RayJobConfig: worker_node_config: typing.List[WorkerNodeConfig] head_node_config: typing.Optional[HeadNodeConfig] = None enable_autoscaling: bool = False - autoscaler_options: Optional[AutoscalerOptions] = None + autoscaler_options: Optional[AutoscalerOptionsConfig] = None runtime_env: typing.Optional[dict] = None address: typing.Optional[str] = None shutdown_after_job_finishes: bool = False @@ -148,12 +149,24 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] worker_group_spec.append( WorkerGroupSpec(c.group_name, c.replicas, c.min_replicas, c.max_replicas, c.ray_start_params, k8s_pod) ) + + autoscalerOptions = None + if cfg.autoscaler_options is not None: + autoscalerOptions = AutoscalerOptions( + upscaling_mode = cfg.autoscaler_options.upscaling_mode, + idle_timeout_seconds = cfg.autoscaler_options.idle_timeout_seconds, + image = cfg.autoscaler_options.image, + env = cfg.autoscaler_options.env, + requests = cfg.autoscaler_options.requests, + limits = cfg.autoscaler_options.limits, + ) ray_job = RayJob( ray_cluster=RayCluster( head_group_spec=head_group_spec, worker_group_spec=worker_group_spec, enable_autoscaling=(cfg.enable_autoscaling if cfg.enable_autoscaling else False), + autoscalerOptions=autoscalerOptions, ), runtime_env=runtime_env, runtime_env_yaml=runtime_env_yaml, From 97c5daed8e692c95dddbb5ac7fdaff98def914af Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Tue, 31 Mar 2026 10:55:40 +0800 Subject: [PATCH 03/17] add resource convert Signed-off-by: Yuteng Chen --- .../flytekitplugins/ray/models.py | 100 +++++++++++++----- .../flytekit-ray/flytekitplugins/ray/task.py | 16 +-- 2 files changed, 81 insertions(+), 35 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 47236c82f8..020130e616 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -2,10 +2,34 @@ from flyteidl.plugins import ray_pb2 as _ray_pb2 +from flytekit import Resources from flytekit.models import common as _common from flytekit.models.task import K8sPod +def _flytekit_resources_to_ray_entries(resources: Resources) -> typing.List[_ray_pb2.Resources.ResourceEntry]: + """Convert flytekit.Resources cpu/mem fields to a list of _ray_pb2.Resources.ResourceEntry.""" + entries = [] + if resources.cpu is not None: + val = resources.cpu[0] if isinstance(resources.cpu, (list, tuple)) else resources.cpu + entries.append(_ray_pb2.Resources.ResourceEntry(name=_ray_pb2.Resources.CPU, value=str(val))) + if resources.mem is not None: + val = resources.mem[0] if isinstance(resources.mem, (list, tuple)) else resources.mem + entries.append(_ray_pb2.Resources.ResourceEntry(name=_ray_pb2.Resources.MEMORY, value=str(val))) + return entries + + +def _ray_entries_to_flytekit_resources(entries) -> typing.Optional[Resources]: + """Convert a list of _ray_pb2.Resources.ResourceEntry back to flytekit.Resources.""" + cpu, mem = None, None + for entry in entries: + if entry.name == _ray_pb2.Resources.CPU: + cpu = entry.value + elif entry.name == _ray_pb2.Resources.MEMORY: + mem = entry.value + return Resources(cpu=cpu, mem=mem) if (cpu or mem) else None + + class WorkerGroupSpec(_common.FlyteIdlEntity): def __init__( self, @@ -149,12 +173,12 @@ def from_flyte_idl(cls, proto): class AutoscalerOptions(_common.FlyteIdlEntity): def __init__( self, - upscaling_mode: Optional[str] = None, - idle_timeout_seconds: Optional[int] = None, - image: Optional[str] = None, - env: Optional[Dict[str, str]] = None, - requests: Optional[Resources] = None, - limits: Optional[Resources] =None + upscaling_mode: typing.Optional[str] = None, + idle_timeout_seconds: typing.Optional[int] = None, + image: typing.Optional[str] = None, + env: typing.Optional[typing.Dict[str, str]] = None, + requests: typing.Optional[Resources] = None, + limits: typing.Optional[Resources] = None, ): self._upscaling_mode = upscaling_mode self._idle_timeout_seconds = idle_timeout_seconds @@ -162,40 +186,60 @@ def __init__( self._env = env self._requests = requests self._limits = limits - + @property - def upscaling_mode(self) -> Optional[str]: + def upscaling_mode(self) -> typing.Optional[str]: return self._upscaling_mode - + @property - def idle_timeout_seconds(self) -> Optional[int]: + def idle_timeout_seconds(self) -> typing.Optional[int]: return self._idle_timeout_seconds - + @property - def image(self) -> Optional[str]: + def image(self) -> typing.Optional[str]: return self._image - + @property - def env(self) -> Optional[Dict[str, str]]: + def env(self) -> typing.Optional[typing.Dict[str, str]]: return self._env - + + @property + def requests(self): + return self._requests + + @property + def limits(self): + return self._limits + def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: - envs List[_ray_pb2.Envvar] = [] - for key, val in self.env.items(): - envs.append(_ray_pb2.Envvar(name=key, value=val)) + envs = [] + if self.env: + for key, val in self.env.items(): + envs.append(_ray_pb2.EnvVar(name=key, value=val)) + ray_resources = None + if self.requests or self.limits: + ray_resources = _ray_pb2.Resources( + requests=_flytekit_resources_to_ray_entries(self.requests) if self.requests else [], + limits=_flytekit_resources_to_ray_entries(self.limits) if self.limits else [], + ) return _ray_pb2.AutoscalerOptions( - upscaling_mode = self.upscaling_mode, - idle_timeout_seconds = self.idle_timeout_seconds, - image = self.image, - env = envs if len(envs) > 0 else None, + upscaling_mode=self.upscaling_mode, + idle_timeout_seconds=self.idle_timeout_seconds, + image=self.image, + env=envs if envs else None, + resources=ray_resources, ) - + @classmethod def from_flyte_idl(cls, proto): + has_resources = proto.HasField("resources") return cls( - upscaling_mode = proto.upscaling_mode, - idle_timeout_seconds = proto.idle_timeout_seconds, - image = proto.image, + upscaling_mode=proto.upscaling_mode, + idle_timeout_seconds=proto.idle_timeout_seconds, + image=proto.image, + env={e.name: e.value for e in proto.env} if proto.env else None, + requests=_ray_entries_to_flytekit_resources(proto.resources.requests) if has_resources else None, + limits=_ray_entries_to_flytekit_resources(proto.resources.limits) if has_resources else None, ) @@ -209,7 +253,7 @@ def __init__( worker_group_spec: typing.List[WorkerGroupSpec], head_group_spec: typing.Optional[HeadGroupSpec] = None, enable_autoscaling: bool = False, - autoscaler_options: typing.Optional[AutoscalerOptions] = None + autoscaler_options: typing.Optional[AutoscalerOptions] = None, ): self._head_group_spec = head_group_spec self._worker_group_spec = worker_group_spec @@ -239,7 +283,7 @@ def enable_autoscaling(self) -> bool: :rtype: bool """ return self._enable_autoscaling - + @property def autoscaler_options(self) -> typing.Optional[AutoscalerOptions]: return self._autoscaler_options diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index fb79bd5d43..0331bcf854 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -56,6 +56,7 @@ def __post_init__(self): if self.requests and self.limits: raise ValueError("Cannot specify both pod_template and requests/limits") + @dataclass class AutoscalerOptionsConfig: upscaling_mode: Optional[str] = None @@ -65,6 +66,7 @@ class AutoscalerOptionsConfig: requests: Optional[Resources] = None limits: Optional[Resources] = None + @dataclass class RayJobConfig: worker_node_config: typing.List[WorkerNodeConfig] @@ -149,16 +151,16 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] worker_group_spec.append( WorkerGroupSpec(c.group_name, c.replicas, c.min_replicas, c.max_replicas, c.ray_start_params, k8s_pod) ) - + autoscalerOptions = None if cfg.autoscaler_options is not None: autoscalerOptions = AutoscalerOptions( - upscaling_mode = cfg.autoscaler_options.upscaling_mode, - idle_timeout_seconds = cfg.autoscaler_options.idle_timeout_seconds, - image = cfg.autoscaler_options.image, - env = cfg.autoscaler_options.env, - requests = cfg.autoscaler_options.requests, - limits = cfg.autoscaler_options.limits, + upscaling_mode=cfg.autoscaler_options.upscaling_mode, + idle_timeout_seconds=cfg.autoscaler_options.idle_timeout_seconds, + image=cfg.autoscaler_options.image, + env=cfg.autoscaler_options.env, + requests=cfg.autoscaler_options.requests, + limits=cfg.autoscaler_options.limits, ) ray_job = RayJob( From 190b1017862d9f994016250c195a9d906fc3ebf2 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Tue, 31 Mar 2026 11:21:26 +0800 Subject: [PATCH 04/17] fix bug and add test Signed-off-by: Yuteng Chen --- .../flytekitplugins/ray/models.py | 2 +- .../flytekit-ray/flytekitplugins/ray/task.py | 3 ++- plugins/flytekit-ray/tests/test_ray.py | 19 ++++++++++++++++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 020130e616..583fa05a71 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -296,7 +296,7 @@ def to_flyte_idl(self) -> _ray_pb2.RayCluster: head_group_spec=self.head_group_spec.to_flyte_idl() if self.head_group_spec else None, worker_group_spec=[wg.to_flyte_idl() for wg in self.worker_group_spec], enable_autoscaling=self.enable_autoscaling, - autoscaler_options=self.autoscaler_options.to_flyte_idl(), + autoscaler_options=self.autoscaler_options.to_flyte_idl() if self.autoscaler_options else None, ) @classmethod diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index 0331bcf854..82945c598d 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -7,6 +7,7 @@ import yaml from flytekitplugins.ray.models import ( + AutoscalerOptions, HeadGroupSpec, RayCluster, RayJob, @@ -168,7 +169,7 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] head_group_spec=head_group_spec, worker_group_spec=worker_group_spec, enable_autoscaling=(cfg.enable_autoscaling if cfg.enable_autoscaling else False), - autoscalerOptions=autoscalerOptions, + autoscaler_options=autoscalerOptions, ), runtime_env=runtime_env, runtime_env_yaml=runtime_env_yaml, diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index c9b00a6dad..388010e0f2 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -7,12 +7,13 @@ from flytekit.core.resources import pod_spec_from_resources from flytekitplugins.ray import HeadNodeConfig from flytekitplugins.ray.models import ( + AutoscalerOptions, HeadGroupSpec, RayCluster, RayJob, WorkerGroupSpec, ) -from flytekitplugins.ray.task import RayJobConfig, WorkerNodeConfig +from flytekitplugins.ray.task import AutoscalerOptionsConfig, RayJobConfig, WorkerNodeConfig from google.protobuf.json_format import MessageToDict from flytekit import PythonFunctionTask, task, PodTemplate, Resources @@ -39,6 +40,14 @@ head_node_config=HeadNodeConfig(requests=Resources(cpu="1", mem="1Gi"), limits=Resources(cpu="2", mem="2Gi")), runtime_env={"pip": ["numpy"]}, enable_autoscaling=True, + autoscaler_options=AutoscalerOptionsConfig( + upscaling_mode="Conservative", + idle_timeout_seconds=120, + image="rayproject/ray:2.9.0", + env={"lKeyA": "lValA"}, + requests=Resources(cpu="1", mem="1Gi"), + limits=Resources(cpu="2", mem="2Gi"), + ), shutdown_after_job_finishes=True, ttl_seconds_after_finished=20, ) @@ -86,6 +95,14 @@ def t1(a: int) -> str: ], head_group_spec=HeadGroupSpec(k8s_pod=K8sPod.from_pod_template(head_pod_template)), enable_autoscaling=True, + autoscaler_options=AutoscalerOptions( + upscaling_mode="Conservative", + idle_timeout_seconds=120, + image="rayproject/ray:2.9.0", + env={"lKeyA": "lValA"}, + requests=Resources(cpu="1", mem="1Gi"), + limits=Resources(cpu="2", mem="2Gi"), + ), ), runtime_env=base64.b64encode(json.dumps({"pip": ["numpy"]}).encode()).decode(), runtime_env_yaml=yaml.dump({"pip": ["numpy"]}), From 74e271cca6f85b37f81f6258086d2645a1ee36c8 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Wed, 1 Apr 2026 15:30:05 +0800 Subject: [PATCH 05/17] nested requests and limits Signed-off-by: Yuteng Chen --- .../flytekitplugins/ray/__init__.py | 3 ++- .../flytekit-ray/flytekitplugins/ray/task.py | 18 ++++++++++++++---- plugins/flytekit-ray/tests/test_ray.py | 10 ++++++---- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/__init__.py b/plugins/flytekit-ray/flytekitplugins/ray/__init__.py index ff6fcfd2e6..32b2caf8de 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/__init__.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/__init__.py @@ -9,7 +9,8 @@ HeadNodeConfig RayJobConfig + ResourcesConfig WorkerNodeConfig """ -from .task import HeadNodeConfig, RayJobConfig, WorkerNodeConfig +from .task import HeadNodeConfig, RayJobConfig, WorkerNodeConfig, AutoscalerOptionsConfig, ResourcesConfig diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index 82945c598d..b58a7961ce 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -58,14 +58,19 @@ def __post_init__(self): raise ValueError("Cannot specify both pod_template and requests/limits") +@dataclass +class ResourcesConfig: + requests: Optional[Resources] = None + limits: Optional[Resources] = None + + @dataclass class AutoscalerOptionsConfig: upscaling_mode: Optional[str] = None idle_timeout_seconds: Optional[int] = None env: Optional[Dict[str, str]] = None image: Optional[str] = None - requests: Optional[Resources] = None - limits: Optional[Resources] = None + resources: Optional[ResourcesConfig] = None @dataclass @@ -155,13 +160,18 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] autoscalerOptions = None if cfg.autoscaler_options is not None: + requests = None + limits = None + if cfg.autoscaler_options.resources is not None: + requests = cfg.autoscaler_options.resources.requests if cfg.autoscaler_options.resources.requests is not None else None + limits = cfg.autoscaler_options.resources.limits if cfg.autoscaler_options.resources.limits is not None else None autoscalerOptions = AutoscalerOptions( upscaling_mode=cfg.autoscaler_options.upscaling_mode, idle_timeout_seconds=cfg.autoscaler_options.idle_timeout_seconds, image=cfg.autoscaler_options.image, env=cfg.autoscaler_options.env, - requests=cfg.autoscaler_options.requests, - limits=cfg.autoscaler_options.limits, + requests=requests, + limits=limits, ) ray_job = RayJob( diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index 388010e0f2..ad096d43ee 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -13,7 +13,7 @@ RayJob, WorkerGroupSpec, ) -from flytekitplugins.ray.task import AutoscalerOptionsConfig, RayJobConfig, WorkerNodeConfig +from flytekitplugins.ray.task import AutoscalerOptionsConfig, RayJobConfig, ResourcesConfig, WorkerNodeConfig from google.protobuf.json_format import MessageToDict from flytekit import PythonFunctionTask, task, PodTemplate, Resources @@ -45,8 +45,10 @@ idle_timeout_seconds=120, image="rayproject/ray:2.9.0", env={"lKeyA": "lValA"}, - requests=Resources(cpu="1", mem="1Gi"), - limits=Resources(cpu="2", mem="2Gi"), + resources=ResourcesConfig( + requests=Resources(cpu="1", mem="1Gi"), + limits=Resources(cpu="2", mem="2Gi"), + ), ), shutdown_after_job_finishes=True, ttl_seconds_after_finished=20, @@ -183,4 +185,4 @@ def t1(a: int) -> str: "t1", ] - # cannot execute this as it will try to hit a non-existent cluster + # cannot execute this as it will try to hit a non-existent cluster \ No newline at end of file From 7d1f5cae9ccf9163708f7183340f53a61239fb83 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Thu, 2 Apr 2026 12:11:05 +0800 Subject: [PATCH 06/17] remove envvar Signed-off-by: Yuteng Chen --- plugins/flytekit-ray/flytekitplugins/ray/models.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 583fa05a71..4f1a2a84a9 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -1,6 +1,7 @@ import typing from flyteidl.plugins import ray_pb2 as _ray_pb2 +from flyteidl.core import literals_pb2 as _literals_pb2 from flytekit import Resources from flytekit.models import common as _common @@ -212,10 +213,10 @@ def limits(self): return self._limits def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: - envs = [] + envs := [] if self.env: for key, val in self.env.items(): - envs.append(_ray_pb2.EnvVar(name=key, value=val)) + envs.append(_ray_pb2.KeyValuePair(key=key, value=val)) ray_resources = None if self.requests or self.limits: ray_resources = _ray_pb2.Resources( @@ -237,7 +238,7 @@ def from_flyte_idl(cls, proto): upscaling_mode=proto.upscaling_mode, idle_timeout_seconds=proto.idle_timeout_seconds, image=proto.image, - env={e.name: e.value for e in proto.env} if proto.env else None, + env={e.key: e.value for e in proto.env} if proto.env else None, requests=_ray_entries_to_flytekit_resources(proto.resources.requests) if has_resources else None, limits=_ray_entries_to_flytekit_resources(proto.resources.limits) if has_resources else None, ) From 574ef0c9d1a36a5de4d58d6ab49bd6deddc99eaa Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Thu, 2 Apr 2026 15:26:28 +0800 Subject: [PATCH 07/17] use Resources Signed-off-by: Yuteng Chen --- .../flytekitplugins/ray/models.py | 61 +++++++++++-------- .../flytekit-ray/flytekitplugins/ray/task.py | 15 +---- 2 files changed, 36 insertions(+), 40 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 4f1a2a84a9..16b41a9914 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -1,23 +1,42 @@ import typing +from flyteidl.core import tasks_pb2 as _tasks_pb2 from flyteidl.plugins import ray_pb2 as _ray_pb2 -from flyteidl.core import literals_pb2 as _literals_pb2 from flytekit import Resources from flytekit.models import common as _common from flytekit.models.task import K8sPod -def _flytekit_resources_to_ray_entries(resources: Resources) -> typing.List[_ray_pb2.Resources.ResourceEntry]: - """Convert flytekit.Resources cpu/mem fields to a list of _ray_pb2.Resources.ResourceEntry.""" - entries = [] +def _flytekit_resources_to_pb_resources(resources: typing.Optional[Resources]) -> typing.Optional[_tasks_pb2.Resources]: + if resources is None: + return None + + requests: List[_tasks_pb2.Resources.ResourceEntry] = [] + limits: List[_tasks_pb2.Resources.ResourceEntry] = [] if resources.cpu is not None: - val = resources.cpu[0] if isinstance(resources.cpu, (list, tuple)) else resources.cpu - entries.append(_ray_pb2.Resources.ResourceEntry(name=_ray_pb2.Resources.CPU, value=str(val))) + if isinstance(resources.cpu, (list, tuple)): + request = resources.cpu[0] + limit = resources.cpu[1] if len(resource.cpu) == 2 else request + else: + limit = request = resources.cpu + requests.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.CPU, value=request)) + limits.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.CPU, value=limit)) + if resources.mem is not None: - val = resources.mem[0] if isinstance(resources.mem, (list, tuple)) else resources.mem - entries.append(_ray_pb2.Resources.ResourceEntry(name=_ray_pb2.Resources.MEMORY, value=str(val))) - return entries + if isinstance(resources.mem, (list, tuple)): + request = resources.mem[0] + limit = resources.mem[1] if len(resource.mem) == 2 else request + else: + limit = request = resources.mem + requests.append( + _tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.MEMORY, value=request) + ) + limits.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.MEMORY, value=limit)) + return _tasks_pb2.Resources( + requests=requests, + limits=limits, + ) def _ray_entries_to_flytekit_resources(entries) -> typing.Optional[Resources]: @@ -178,15 +197,13 @@ def __init__( idle_timeout_seconds: typing.Optional[int] = None, image: typing.Optional[str] = None, env: typing.Optional[typing.Dict[str, str]] = None, - requests: typing.Optional[Resources] = None, - limits: typing.Optional[Resources] = None, + resources: typing.Optional[Resources] = None, ): self._upscaling_mode = upscaling_mode self._idle_timeout_seconds = idle_timeout_seconds self._image = image self._env = env - self._requests = requests - self._limits = limits + self._resources = resources @property def upscaling_mode(self) -> typing.Optional[str]: @@ -205,30 +222,20 @@ def env(self) -> typing.Optional[typing.Dict[str, str]]: return self._env @property - def requests(self): - return self._requests - - @property - def limits(self): - return self._limits + def resources(self): + return self._resources def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: - envs := [] + envs = [] if self.env: for key, val in self.env.items(): envs.append(_ray_pb2.KeyValuePair(key=key, value=val)) - ray_resources = None - if self.requests or self.limits: - ray_resources = _ray_pb2.Resources( - requests=_flytekit_resources_to_ray_entries(self.requests) if self.requests else [], - limits=_flytekit_resources_to_ray_entries(self.limits) if self.limits else [], - ) return _ray_pb2.AutoscalerOptions( upscaling_mode=self.upscaling_mode, idle_timeout_seconds=self.idle_timeout_seconds, image=self.image, env=envs if envs else None, - resources=ray_resources, + resources=_flytekit_resources_to_pb_resources(self.resources), ) @classmethod diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index b58a7961ce..0d0f0eb6ac 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -58,11 +58,6 @@ def __post_init__(self): raise ValueError("Cannot specify both pod_template and requests/limits") -@dataclass -class ResourcesConfig: - requests: Optional[Resources] = None - limits: Optional[Resources] = None - @dataclass class AutoscalerOptionsConfig: @@ -70,7 +65,7 @@ class AutoscalerOptionsConfig: idle_timeout_seconds: Optional[int] = None env: Optional[Dict[str, str]] = None image: Optional[str] = None - resources: Optional[ResourcesConfig] = None + resources: Optional[Resources] = None @dataclass @@ -160,18 +155,12 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] autoscalerOptions = None if cfg.autoscaler_options is not None: - requests = None - limits = None - if cfg.autoscaler_options.resources is not None: - requests = cfg.autoscaler_options.resources.requests if cfg.autoscaler_options.resources.requests is not None else None - limits = cfg.autoscaler_options.resources.limits if cfg.autoscaler_options.resources.limits is not None else None autoscalerOptions = AutoscalerOptions( upscaling_mode=cfg.autoscaler_options.upscaling_mode, idle_timeout_seconds=cfg.autoscaler_options.idle_timeout_seconds, image=cfg.autoscaler_options.image, env=cfg.autoscaler_options.env, - requests=requests, - limits=limits, + resources=cfg.autoscaler_options.resources ) ray_job = RayJob( From 9e50bd847606f41b10e23017ffe0d28753e4ac01 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Thu, 2 Apr 2026 15:58:32 +0800 Subject: [PATCH 08/17] fix bug Signed-off-by: Yuteng Chen --- plugins/flytekit-ray/flytekitplugins/ray/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/__init__.py b/plugins/flytekit-ray/flytekitplugins/ray/__init__.py index 32b2caf8de..0aaf4e39b1 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/__init__.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/__init__.py @@ -13,4 +13,4 @@ WorkerNodeConfig """ -from .task import HeadNodeConfig, RayJobConfig, WorkerNodeConfig, AutoscalerOptionsConfig, ResourcesConfig +from .task import HeadNodeConfig, RayJobConfig, WorkerNodeConfig, AutoscalerOptionsConfig From 3cfdf661f83348b1e531e92ed7a404b873660ed9 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Thu, 2 Apr 2026 19:21:50 +0800 Subject: [PATCH 09/17] add resources Signed-off-by: Yuteng Chen --- .../flytekitplugins/ray/models.py | 43 +++++++++++++------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 16b41a9914..c6d687ea20 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -39,15 +39,33 @@ def _flytekit_resources_to_pb_resources(resources: typing.Optional[Resources]) - ) -def _ray_entries_to_flytekit_resources(entries) -> typing.Optional[Resources]: - """Convert a list of _ray_pb2.Resources.ResourceEntry back to flytekit.Resources.""" - cpu, mem = None, None - for entry in entries: - if entry.name == _ray_pb2.Resources.CPU: - cpu = entry.value - elif entry.name == _ray_pb2.Resources.MEMORY: - mem = entry.value - return Resources(cpu=cpu, mem=mem) if (cpu or mem) else None +def _ray_resources_to_flytekit_resources(pb_res: typing.Optional[_tasks_pb2.Resources]) -> typing.Optional[Resources]: + if pb_res is None: + return None + + cpu_request = cpu_limit = mem_request = mem_limit = None + + for entry in pb_res.requests: + if entry.name == _tasks_pb2.Resources.ResourceName.CPU: + cpu_request = entry.value + elif entry.name == _tasks_pb2.Resources.ResourceName.MEMORY: + mem_request = entry.value + + for entry in pb_res.limits: + if entry.name == _tasks_pb2.Resources.ResourceName.CPU: + cpu_limit = entry.value + elif entry.name == _tasks_pb2.Resources.ResourceName.MEMORY: + mem_limit = entry.value + + cpu = None + if cpu_request is not None or cpu_limit is not None: + cpu = cpu_request if cpu_request == cpu_limit else (cpu_request, cpu_limit) + + mem = None + if mem_request is not None or mem_limit is not None: + mem = mem_request if mem_request == mem_limit else (mem_request, mem_limit) + + return Resources(cpu=cpu, mem=mem) if (cpu is not None or mem is not None) else None class WorkerGroupSpec(_common.FlyteIdlEntity): @@ -222,7 +240,7 @@ def env(self) -> typing.Optional[typing.Dict[str, str]]: return self._env @property - def resources(self): + def resources(self) -> typing.Optional[Resources]: return self._resources def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: @@ -231,7 +249,6 @@ def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: for key, val in self.env.items(): envs.append(_ray_pb2.KeyValuePair(key=key, value=val)) return _ray_pb2.AutoscalerOptions( - upscaling_mode=self.upscaling_mode, idle_timeout_seconds=self.idle_timeout_seconds, image=self.image, env=envs if envs else None, @@ -242,12 +259,10 @@ def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: def from_flyte_idl(cls, proto): has_resources = proto.HasField("resources") return cls( - upscaling_mode=proto.upscaling_mode, idle_timeout_seconds=proto.idle_timeout_seconds, image=proto.image, env={e.key: e.value for e in proto.env} if proto.env else None, - requests=_ray_entries_to_flytekit_resources(proto.resources.requests) if has_resources else None, - limits=_ray_entries_to_flytekit_resources(proto.resources.limits) if has_resources else None, + resources=_ray_resources_to_flytekit_resources(has_resources), ) From 37bc74e1c29f1f6abe11d64eafbc77244bd983b9 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Thu, 2 Apr 2026 21:22:58 +0800 Subject: [PATCH 10/17] fix bugs Signed-off-by: Yuteng Chen --- .../flytekitplugins/ray/models.py | 23 +++++++++++-------- plugins/flytekit-ray/tests/test_ray.py | 10 +++----- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index c6d687ea20..324851e07b 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -1,6 +1,7 @@ import typing from flyteidl.core import tasks_pb2 as _tasks_pb2 +from flyteidl.core import literals_pb2 as _literals_pb2 from flyteidl.plugins import ray_pb2 as _ray_pb2 from flytekit import Resources @@ -12,27 +13,27 @@ def _flytekit_resources_to_pb_resources(resources: typing.Optional[Resources]) - if resources is None: return None - requests: List[_tasks_pb2.Resources.ResourceEntry] = [] - limits: List[_tasks_pb2.Resources.ResourceEntry] = [] + requests: typing.List[_tasks_pb2.Resources.ResourceEntry] = [] + limits: typing.List[_tasks_pb2.Resources.ResourceEntry] = [] if resources.cpu is not None: if isinstance(resources.cpu, (list, tuple)): request = resources.cpu[0] - limit = resources.cpu[1] if len(resource.cpu) == 2 else request + limit = resources.cpu[1] if len(resources.cpu) == 2 else request else: limit = request = resources.cpu - requests.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.CPU, value=request)) - limits.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.CPU, value=limit)) + requests.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.CPU, value=str(request))) + limits.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.CPU, value=str(limit))) if resources.mem is not None: if isinstance(resources.mem, (list, tuple)): request = resources.mem[0] - limit = resources.mem[1] if len(resource.mem) == 2 else request + limit = resources.mem[1] if len(resources.mem) == 2 else request else: limit = request = resources.mem requests.append( - _tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.MEMORY, value=request) + _tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.MEMORY, value=str(request)) ) - limits.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.MEMORY, value=limit)) + limits.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.MEMORY, value=str(limit))) return _tasks_pb2.Resources( requests=requests, limits=limits, @@ -247,8 +248,9 @@ def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: envs = [] if self.env: for key, val in self.env.items(): - envs.append(_ray_pb2.KeyValuePair(key=key, value=val)) + envs.append(_literals_pb2.KeyValuePair(key=key, value=val)) return _ray_pb2.AutoscalerOptions( + upscaling_mode=_ray_pb2.AutoscalerOptions.UpscalingMode.Value("UPSCALING_MODE_"+self.upscaling_mode.upper()) if self.upscaling_mode else None, idle_timeout_seconds=self.idle_timeout_seconds, image=self.image, env=envs if envs else None, @@ -259,10 +261,11 @@ def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: def from_flyte_idl(cls, proto): has_resources = proto.HasField("resources") return cls( + upscaling_mode=_ray_pb2.AutoscalerOptions.UpscalingMode.Name(proto.upscaling_mode).removeprefix("UPSCALING_MODE_").title() if proto.upscaling_mode else None, idle_timeout_seconds=proto.idle_timeout_seconds, image=proto.image, env={e.key: e.value for e in proto.env} if proto.env else None, - resources=_ray_resources_to_flytekit_resources(has_resources), + resources=_ray_resources_to_flytekit_resources(proto.resources if has_resources else None), ) diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index ad096d43ee..8f2d535cd7 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -13,7 +13,7 @@ RayJob, WorkerGroupSpec, ) -from flytekitplugins.ray.task import AutoscalerOptionsConfig, RayJobConfig, ResourcesConfig, WorkerNodeConfig +from flytekitplugins.ray.task import AutoscalerOptionsConfig, RayJobConfig, WorkerNodeConfig from google.protobuf.json_format import MessageToDict from flytekit import PythonFunctionTask, task, PodTemplate, Resources @@ -45,10 +45,7 @@ idle_timeout_seconds=120, image="rayproject/ray:2.9.0", env={"lKeyA": "lValA"}, - resources=ResourcesConfig( - requests=Resources(cpu="1", mem="1Gi"), - limits=Resources(cpu="2", mem="2Gi"), - ), + resources=Resources(cpu=("1","2"), mem="1Gi"), ), shutdown_after_job_finishes=True, ttl_seconds_after_finished=20, @@ -102,8 +99,7 @@ def t1(a: int) -> str: idle_timeout_seconds=120, image="rayproject/ray:2.9.0", env={"lKeyA": "lValA"}, - requests=Resources(cpu="1", mem="1Gi"), - limits=Resources(cpu="2", mem="2Gi"), + resources = Resources(cpu=("1","2"), mem="1Gi"), ), ), runtime_env=base64.b64encode(json.dumps({"pip": ["numpy"]}).encode()).decode(), From 139ae106f644a5660ddb6f337567685d58c7dca5 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Thu, 2 Apr 2026 21:28:15 +0800 Subject: [PATCH 11/17] format Signed-off-by: Yuteng Chen --- .../flytekitplugins/ray/__init__.py | 2 +- .../flytekitplugins/ray/models.py | 22 ++++++++++++++----- .../flytekit-ray/flytekitplugins/ray/task.py | 3 +-- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/__init__.py b/plugins/flytekit-ray/flytekitplugins/ray/__init__.py index 0aaf4e39b1..da2424529b 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/__init__.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/__init__.py @@ -13,4 +13,4 @@ WorkerNodeConfig """ -from .task import HeadNodeConfig, RayJobConfig, WorkerNodeConfig, AutoscalerOptionsConfig +from .task import AutoscalerOptionsConfig, HeadNodeConfig, RayJobConfig, WorkerNodeConfig diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 324851e07b..c56fade4ee 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -1,7 +1,7 @@ import typing -from flyteidl.core import tasks_pb2 as _tasks_pb2 from flyteidl.core import literals_pb2 as _literals_pb2 +from flyteidl.core import tasks_pb2 as _tasks_pb2 from flyteidl.plugins import ray_pb2 as _ray_pb2 from flytekit import Resources @@ -21,7 +21,9 @@ def _flytekit_resources_to_pb_resources(resources: typing.Optional[Resources]) - limit = resources.cpu[1] if len(resources.cpu) == 2 else request else: limit = request = resources.cpu - requests.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.CPU, value=str(request))) + requests.append( + _tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.CPU, value=str(request)) + ) limits.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.CPU, value=str(limit))) if resources.mem is not None: @@ -33,7 +35,9 @@ def _flytekit_resources_to_pb_resources(resources: typing.Optional[Resources]) - requests.append( _tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.MEMORY, value=str(request)) ) - limits.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.MEMORY, value=str(limit))) + limits.append( + _tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.MEMORY, value=str(limit)) + ) return _tasks_pb2.Resources( requests=requests, limits=limits, @@ -250,7 +254,11 @@ def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: for key, val in self.env.items(): envs.append(_literals_pb2.KeyValuePair(key=key, value=val)) return _ray_pb2.AutoscalerOptions( - upscaling_mode=_ray_pb2.AutoscalerOptions.UpscalingMode.Value("UPSCALING_MODE_"+self.upscaling_mode.upper()) if self.upscaling_mode else None, + upscaling_mode=_ray_pb2.AutoscalerOptions.UpscalingMode.Value( + "UPSCALING_MODE_" + self.upscaling_mode.upper() + ) + if self.upscaling_mode + else None, idle_timeout_seconds=self.idle_timeout_seconds, image=self.image, env=envs if envs else None, @@ -261,7 +269,11 @@ def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: def from_flyte_idl(cls, proto): has_resources = proto.HasField("resources") return cls( - upscaling_mode=_ray_pb2.AutoscalerOptions.UpscalingMode.Name(proto.upscaling_mode).removeprefix("UPSCALING_MODE_").title() if proto.upscaling_mode else None, + upscaling_mode=_ray_pb2.AutoscalerOptions.UpscalingMode.Name(proto.upscaling_mode) + .removeprefix("UPSCALING_MODE_") + .title() + if proto.upscaling_mode + else None, idle_timeout_seconds=proto.idle_timeout_seconds, image=proto.image, env={e.key: e.value for e in proto.env} if proto.env else None, diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index 0d0f0eb6ac..024f9805ff 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -58,7 +58,6 @@ def __post_init__(self): raise ValueError("Cannot specify both pod_template and requests/limits") - @dataclass class AutoscalerOptionsConfig: upscaling_mode: Optional[str] = None @@ -160,7 +159,7 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] idle_timeout_seconds=cfg.autoscaler_options.idle_timeout_seconds, image=cfg.autoscaler_options.image, env=cfg.autoscaler_options.env, - resources=cfg.autoscaler_options.resources + resources=cfg.autoscaler_options.resources, ) ray_job = RayJob( From f4bee0dcf5584398a35c396d10d1b19e3a16ba61 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Mon, 6 Apr 2026 13:31:13 +0800 Subject: [PATCH 12/17] adopting requests and limits Signed-off-by: Yuteng Chen --- .../flytekitplugins/ray/models.py | 77 ++----------------- .../flytekit-ray/flytekitplugins/ray/task.py | 10 ++- plugins/flytekit-ray/tests/test_ray.py | 10 ++- 3 files changed, 19 insertions(+), 78 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index c56fade4ee..89c468ddc9 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -4,75 +4,10 @@ from flyteidl.core import tasks_pb2 as _tasks_pb2 from flyteidl.plugins import ray_pb2 as _ray_pb2 -from flytekit import Resources from flytekit.models import common as _common +from flytekit.models import task as _task_models from flytekit.models.task import K8sPod - -def _flytekit_resources_to_pb_resources(resources: typing.Optional[Resources]) -> typing.Optional[_tasks_pb2.Resources]: - if resources is None: - return None - - requests: typing.List[_tasks_pb2.Resources.ResourceEntry] = [] - limits: typing.List[_tasks_pb2.Resources.ResourceEntry] = [] - if resources.cpu is not None: - if isinstance(resources.cpu, (list, tuple)): - request = resources.cpu[0] - limit = resources.cpu[1] if len(resources.cpu) == 2 else request - else: - limit = request = resources.cpu - requests.append( - _tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.CPU, value=str(request)) - ) - limits.append(_tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.CPU, value=str(limit))) - - if resources.mem is not None: - if isinstance(resources.mem, (list, tuple)): - request = resources.mem[0] - limit = resources.mem[1] if len(resources.mem) == 2 else request - else: - limit = request = resources.mem - requests.append( - _tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.MEMORY, value=str(request)) - ) - limits.append( - _tasks_pb2.Resources.ResourceEntry(name=_tasks_pb2.Resources.ResourceName.MEMORY, value=str(limit)) - ) - return _tasks_pb2.Resources( - requests=requests, - limits=limits, - ) - - -def _ray_resources_to_flytekit_resources(pb_res: typing.Optional[_tasks_pb2.Resources]) -> typing.Optional[Resources]: - if pb_res is None: - return None - - cpu_request = cpu_limit = mem_request = mem_limit = None - - for entry in pb_res.requests: - if entry.name == _tasks_pb2.Resources.ResourceName.CPU: - cpu_request = entry.value - elif entry.name == _tasks_pb2.Resources.ResourceName.MEMORY: - mem_request = entry.value - - for entry in pb_res.limits: - if entry.name == _tasks_pb2.Resources.ResourceName.CPU: - cpu_limit = entry.value - elif entry.name == _tasks_pb2.Resources.ResourceName.MEMORY: - mem_limit = entry.value - - cpu = None - if cpu_request is not None or cpu_limit is not None: - cpu = cpu_request if cpu_request == cpu_limit else (cpu_request, cpu_limit) - - mem = None - if mem_request is not None or mem_limit is not None: - mem = mem_request if mem_request == mem_limit else (mem_request, mem_limit) - - return Resources(cpu=cpu, mem=mem) if (cpu is not None or mem is not None) else None - - class WorkerGroupSpec(_common.FlyteIdlEntity): def __init__( self, @@ -220,7 +155,7 @@ def __init__( idle_timeout_seconds: typing.Optional[int] = None, image: typing.Optional[str] = None, env: typing.Optional[typing.Dict[str, str]] = None, - resources: typing.Optional[Resources] = None, + resources: typing.Optional[_task_models.Resources] = None, ): self._upscaling_mode = upscaling_mode self._idle_timeout_seconds = idle_timeout_seconds @@ -245,7 +180,7 @@ def env(self) -> typing.Optional[typing.Dict[str, str]]: return self._env @property - def resources(self) -> typing.Optional[Resources]: + def resources(self) -> typing.Optional[_task_models.Resources]: return self._resources def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: @@ -262,12 +197,10 @@ def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: idle_timeout_seconds=self.idle_timeout_seconds, image=self.image, env=envs if envs else None, - resources=_flytekit_resources_to_pb_resources(self.resources), + resources=self.resources.to_flyte_idl() if self.resources else None, ) - @classmethod def from_flyte_idl(cls, proto): - has_resources = proto.HasField("resources") return cls( upscaling_mode=_ray_pb2.AutoscalerOptions.UpscalingMode.Name(proto.upscaling_mode) .removeprefix("UPSCALING_MODE_") @@ -277,7 +210,7 @@ def from_flyte_idl(cls, proto): idle_timeout_seconds=proto.idle_timeout_seconds, image=proto.image, env={e.key: e.value for e in proto.env} if proto.env else None, - resources=_ray_resources_to_flytekit_resources(proto.resources if has_resources else None), + resources=_task_models.Resources.from_flyte_idl(proto.resources) if proto.HasField("resources") else None, ) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index 024f9805ff..dd7d52a969 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -4,6 +4,7 @@ import typing from dataclasses import dataclass from typing import Any, Callable, Dict, Optional +from flytekit.core.resources import convert_resources_to_resource_model import yaml from flytekitplugins.ray.models import ( @@ -64,8 +65,8 @@ class AutoscalerOptionsConfig: idle_timeout_seconds: Optional[int] = None env: Optional[Dict[str, str]] = None image: Optional[str] = None - resources: Optional[Resources] = None - + requests: Optional[Resources] = None + limits: Optional[Resources] = None @dataclass class RayJobConfig: @@ -159,7 +160,10 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] idle_timeout_seconds=cfg.autoscaler_options.idle_timeout_seconds, image=cfg.autoscaler_options.image, env=cfg.autoscaler_options.env, - resources=cfg.autoscaler_options.resources, + resources=convert_resources_to_resource_model( + requests=cfg.autoscaler_options.requests, + limits=cfg.autoscaler_options.limits, + ) ) ray_job = RayJob( diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index 8f2d535cd7..bf263b85ee 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -4,7 +4,7 @@ import ray import yaml -from flytekit.core.resources import pod_spec_from_resources +from flytekit.core.resources import convert_resources_to_resource_model, pod_spec_from_resources from flytekitplugins.ray import HeadNodeConfig from flytekitplugins.ray.models import ( AutoscalerOptions, @@ -45,7 +45,8 @@ idle_timeout_seconds=120, image="rayproject/ray:2.9.0", env={"lKeyA": "lValA"}, - resources=Resources(cpu=("1","2"), mem="1Gi"), + requests=Resources(cpu="1", mem="1Gi"), + limits=Resources(cpu="2", mem="2Gi"), ), shutdown_after_job_finishes=True, ttl_seconds_after_finished=20, @@ -99,7 +100,10 @@ def t1(a: int) -> str: idle_timeout_seconds=120, image="rayproject/ray:2.9.0", env={"lKeyA": "lValA"}, - resources = Resources(cpu=("1","2"), mem="1Gi"), + resources=convert_resources_to_resource_model( + requests=Resources(cpu="1", mem="1Gi"), + limits=Resources(cpu="2", mem="2Gi"), + ), ), ), runtime_env=base64.b64encode(json.dumps({"pip": ["numpy"]}).encode()).decode(), From 8063a1c1e4f4946b28b86c33fdb6695d869162da Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Sat, 18 Apr 2026 09:38:33 +0800 Subject: [PATCH 13/17] fix problems in core based on mypy lint Signed-off-by: Yuteng Chen --- flytekit/core/artifact.py | 8 ++++---- flytekit/core/interface.py | 2 +- flytekit/core/promise.py | 2 +- flytekit/core/type_engine.py | 20 ++++++++++---------- flytekit/core/worker_queue.py | 2 +- flytekit/core/workflow.py | 4 ++++ 6 files changed, 21 insertions(+), 17 deletions(-) diff --git a/flytekit/core/artifact.py b/flytekit/core/artifact.py index 47e5b146c8..d9af29bcef 100644 --- a/flytekit/core/artifact.py +++ b/flytekit/core/artifact.py @@ -91,16 +91,16 @@ def bind_partitions(self, *args, **kwargs) -> ArtifactIDSpecification: if not self.artifact.partition_keys or k not in self.artifact.partition_keys: raise ValueError(f"Partition key {k} not found in {self.artifact.partition_keys}") if isinstance(v, art_id.InputBindingData): - p.partitions[k] = Partition(art_id.LabelValue(input_binding=v), name=k) + p.partitions[k] = Partition(art_id.LabelValue(input_binding=v), name=k) # type: ignore[index] elif isinstance(v, str): - p.partitions[k] = Partition(art_id.LabelValue(static_value=v), name=k) + p.partitions[k] = Partition(art_id.LabelValue(static_value=v), name=k) # type: ignore[index] else: raise ValueError(f"Partition key {k} needs to be input binding data or static string, not {v}") for k in self.artifact.partition_keys: - if k not in p.partitions: + if k not in p.partitions: # type: ignore[operator] logger.debug(f"Partition {k} not bound for {self.artifact.name}, setting to dynamic binding.") - p.partitions[k] = Partition(value=DYNAMIC_INPUT_BINDING, name=k) + p.partitions[k] = Partition(value=DYNAMIC_INPUT_BINDING, name=k) # type: ignore[index] # Given the context, shouldn't need to set further reference_artifacts. self.partitions = p diff --git a/flytekit/core/interface.py b/flytekit/core/interface.py index 2e0a4cfe0b..28a099ba6f 100644 --- a/flytekit/core/interface.py +++ b/flytekit/core/interface.py @@ -440,7 +440,7 @@ def transform_function_to_interface( # This is just for typing.NamedTuples - in those cases, the user can select a name to call the NamedTuple. We # would like to preserve that name in our custom collections.namedtuple. custom_name = None - if hasattr(return_annotation, "__bases__"): + if return_annotation is not None and hasattr(return_annotation, "__bases__"): bases = return_annotation.__bases__ if len(bases) == 1 and bases[0] == tuple and hasattr(return_annotation, "_fields"): if hasattr(return_annotation, "__name__") and return_annotation.__name__ != "": diff --git a/flytekit/core/promise.py b/flytekit/core/promise.py index 712f0d25ca..ea56318544 100644 --- a/flytekit/core/promise.py +++ b/flytekit/core/promise.py @@ -888,7 +888,7 @@ async def binding_data_from_python_std( lt_type = expected_literal_type.union_type.variants[i] python_type = get_args(t_value_type)[i] if t_value_type else None try: - return await binding_data_from_python_std(ctx, lt_type, t_value, python_type, nodes) + return await binding_data_from_python_std(ctx, lt_type, t_value, cast(type, python_type), nodes) except Exception as e: logger.debug( f"Failed to bind data {t_value} " diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 9993c98479..b4247c664b 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -18,7 +18,7 @@ from collections import OrderedDict from functools import lru_cache, reduce from types import GenericAlias -from typing import Any, Dict, List, NamedTuple, Optional, Type, cast +from typing import Any, Dict, Iterable, List, NamedTuple, Optional, Type, cast import msgpack from cachetools import LRUCache @@ -563,7 +563,7 @@ def assert_type(self, expected_type: Type[DataClassJsonMixin], v: T): original_dict = v # Find the Optional keys in expected_fields_dict - optional_keys = {k for k, t in expected_fields_dict.items() if UnionTransformer.is_optional_type(t)} + optional_keys = {k for k, t in expected_fields_dict.items() if UnionTransformer.is_optional_type(cast(type, t))} # Remove the Optional keys from the keys of original_dict original_key = set(original_dict.keys()) - optional_keys @@ -586,9 +586,9 @@ def assert_type(self, expected_type: Type[DataClassJsonMixin], v: T): for k, v in original_dict.items(): if k in expected_fields_dict: if isinstance(v, dict): - self.assert_type(expected_fields_dict[k], v) + self.assert_type(cast(type, expected_fields_dict[k]), v) else: - expected_type = expected_fields_dict[k] + expected_type = cast(type, expected_fields_dict[k]) original_type = type(v) if UnionTransformer.is_optional_type(expected_type): expected_type = UnionTransformer.get_sub_type_in_optional(expected_type) @@ -599,12 +599,12 @@ def assert_type(self, expected_type: Type[DataClassJsonMixin], v: T): else: for f in dataclasses.fields(type(v)): # type: ignore - original_type = f.type + original_type = cast(type, f.type) if f.name not in expected_fields_dict: raise TypeTransformerFailedError( f"Field '{f.name}' is not present in the expected dataclass fields {expected_type.__name__}" ) - expected_type = expected_fields_dict[f.name] + expected_type = cast(type, expected_fields_dict[f.name]) if UnionTransformer.is_optional_type(original_type): original_type = UnionTransformer.get_sub_type_in_optional(original_type) @@ -796,7 +796,7 @@ def _get_origin_type_in_annotation(self, python_type: Type[T]) -> Type[T]: return get_args(python_type)[0] elif dataclasses.is_dataclass(python_type): for field in dataclasses.fields(copy.deepcopy(python_type)): - field.type = self._get_origin_type_in_annotation(field.type) + field.type = self._get_origin_type_in_annotation(cast(type, field.type)) return python_type def _make_dataclass_serializable(self, python_val: T, python_type: Type[T]) -> typing.Any: @@ -923,7 +923,7 @@ def _fix_dataclass_int(self, dc_type: Type[dataclasses.dataclass], dc: typing.An # Thus we will have to walk the given dataclass and typecast values to int, where expected. for f in dataclasses.fields(dc_type): val = getattr(dc, f.name) - object.__setattr__(dc, f.name, self._fix_val_int(f.type, val)) + object.__setattr__(dc, f.name, self._fix_val_int(cast(type, f.type), val)) return dc @@ -1024,7 +1024,7 @@ def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], exp try: if type(python_val) == _struct.ListValue: literals = [] - for v in python_val: + for v in cast(Iterable, python_val): literal_type = TypeEngine.to_literal_type(type(v)) # Recursively convert python native values to literals literal = TypeEngine.to_literal(ctx, v, type(v), literal_type) @@ -2662,7 +2662,7 @@ def dataclass_from_dict(cls: type, src: typing.Dict[str, typing.Any]) -> typing. constructor_inputs = {} for field_name, value in src.items(): if dataclasses.is_dataclass(field_types_lookup[field_name]): - constructor_inputs[field_name] = dataclass_from_dict(field_types_lookup[field_name], value) + constructor_inputs[field_name] = dataclass_from_dict(cast(type, field_types_lookup[field_name]), value) else: constructor_inputs[field_name] = value diff --git a/flytekit/core/worker_queue.py b/flytekit/core/worker_queue.py index bc0c48bd55..3dd79812f8 100644 --- a/flytekit/core/worker_queue.py +++ b/flytekit/core/worker_queue.py @@ -454,7 +454,7 @@ def _entity_type(entity) -> str: for item in items_list: exec_output = "" if item.is_in_terminal_state: - exec_output = item.result if item.result else item.error + exec_output = str(item.result if item.result else item.error) kind = _entity_type(item.entity) diff --git a/flytekit/core/workflow.py b/flytekit/core/workflow.py index 5feffbf998..667a2c0225 100644 --- a/flytekit/core/workflow.py +++ b/flytekit/core/workflow.py @@ -779,6 +779,10 @@ def __init__( self.compiled = False + @property + def name(self) -> str: # type: ignore[override] + return self._name + @property def function(self): return self._workflow_function From 7e289a20be6fa5caf6b230dd7c7c2c187b099ec9 Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Sat, 18 Apr 2026 09:42:22 +0800 Subject: [PATCH 14/17] format Signed-off-by: Yuteng Chen --- flytekit/core/type_engine.py | 4 +++- plugins/flytekit-ray/flytekitplugins/ray/models.py | 3 ++- plugins/flytekit-ray/flytekitplugins/ray/task.py | 6 +++--- plugins/flytekit-ray/tests/test_ray.py | 2 +- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index b4247c664b..76ecb10828 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -563,7 +563,9 @@ def assert_type(self, expected_type: Type[DataClassJsonMixin], v: T): original_dict = v # Find the Optional keys in expected_fields_dict - optional_keys = {k for k, t in expected_fields_dict.items() if UnionTransformer.is_optional_type(cast(type, t))} + optional_keys = { + k for k, t in expected_fields_dict.items() if UnionTransformer.is_optional_type(cast(type, t)) + } # Remove the Optional keys from the keys of original_dict original_key = set(original_dict.keys()) - optional_keys diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 89c468ddc9..73ab6f55d7 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -1,13 +1,13 @@ import typing from flyteidl.core import literals_pb2 as _literals_pb2 -from flyteidl.core import tasks_pb2 as _tasks_pb2 from flyteidl.plugins import ray_pb2 as _ray_pb2 from flytekit.models import common as _common from flytekit.models import task as _task_models from flytekit.models.task import K8sPod + class WorkerGroupSpec(_common.FlyteIdlEntity): def __init__( self, @@ -199,6 +199,7 @@ def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: env=envs if envs else None, resources=self.resources.to_flyte_idl() if self.resources else None, ) + @classmethod def from_flyte_idl(cls, proto): return cls( diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index dd7d52a969..a7fa0cbd07 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -4,7 +4,6 @@ import typing from dataclasses import dataclass from typing import Any, Callable, Dict, Optional -from flytekit.core.resources import convert_resources_to_resource_model import yaml from flytekitplugins.ray.models import ( @@ -20,7 +19,7 @@ from flytekit.configuration import SerializationSettings from flytekit.core.context_manager import ExecutionParameters, FlyteContextManager from flytekit.core.python_function_task import PythonFunctionTask -from flytekit.core.resources import pod_spec_from_resources +from flytekit.core.resources import convert_resources_to_resource_model, pod_spec_from_resources from flytekit.extend import TaskPlugins from flytekit.models.task import K8sPod @@ -68,6 +67,7 @@ class AutoscalerOptionsConfig: requests: Optional[Resources] = None limits: Optional[Resources] = None + @dataclass class RayJobConfig: worker_node_config: typing.List[WorkerNodeConfig] @@ -163,7 +163,7 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] resources=convert_resources_to_resource_model( requests=cfg.autoscaler_options.requests, limits=cfg.autoscaler_options.limits, - ) + ), ) ray_job = RayJob( diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index bf263b85ee..aadc795f2b 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -185,4 +185,4 @@ def t1(a: int) -> str: "t1", ] - # cannot execute this as it will try to hit a non-existent cluster \ No newline at end of file + # cannot execute this as it will try to hit a non-existent cluster From 1da8c4c1eb37b8c3304cd6fb83eabdc15ad74e4d Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Wed, 22 Apr 2026 15:25:41 +0800 Subject: [PATCH 15/17] remove Optional when partitions is always {} not None Signed-off-by: Yuteng Chen --- flytekit/core/artifact.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flytekit/core/artifact.py b/flytekit/core/artifact.py index d9af29bcef..1af44f99e1 100644 --- a/flytekit/core/artifact.py +++ b/flytekit/core/artifact.py @@ -91,16 +91,16 @@ def bind_partitions(self, *args, **kwargs) -> ArtifactIDSpecification: if not self.artifact.partition_keys or k not in self.artifact.partition_keys: raise ValueError(f"Partition key {k} not found in {self.artifact.partition_keys}") if isinstance(v, art_id.InputBindingData): - p.partitions[k] = Partition(art_id.LabelValue(input_binding=v), name=k) # type: ignore[index] + p.partitions[k] = Partition(art_id.LabelValue(input_binding=v), name=k) elif isinstance(v, str): - p.partitions[k] = Partition(art_id.LabelValue(static_value=v), name=k) # type: ignore[index] + p.partitions[k] = Partition(art_id.LabelValue(static_value=v), name=k) else: raise ValueError(f"Partition key {k} needs to be input binding data or static string, not {v}") for k in self.artifact.partition_keys: - if k not in p.partitions: # type: ignore[operator] + if k not in p.partitions: logger.debug(f"Partition {k} not bound for {self.artifact.name}, setting to dynamic binding.") - p.partitions[k] = Partition(value=DYNAMIC_INPUT_BINDING, name=k) # type: ignore[index] + p.partitions[k] = Partition(value=DYNAMIC_INPUT_BINDING, name=k) # Given the context, shouldn't need to set further reference_artifacts. self.partitions = p @@ -348,7 +348,7 @@ def _repr_html_(self): return ", ".join([str(x) for x in self.__rich_repr__()]) @property - def partitions(self) -> Optional[typing.Dict[str, Partition]]: + def partitions(self) -> typing.Dict[str, Partition]: return self._partitions def set_reference_artifact(self, artifact: Artifact): From bc698eb91cb4424214863b60a1999c6075f281fa Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Wed, 22 Apr 2026 15:28:29 +0800 Subject: [PATCH 16/17] remove Optional when partitions is always {} not None Signed-off-by: Yuteng Chen --- flytekit/core/workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/core/workflow.py b/flytekit/core/workflow.py index 667a2c0225..dde5e493cf 100644 --- a/flytekit/core/workflow.py +++ b/flytekit/core/workflow.py @@ -780,7 +780,7 @@ def __init__( self.compiled = False @property - def name(self) -> str: # type: ignore[override] + def name(self) -> str: return self._name @property From 1bd6f3e454f8dea4d1e51ee2b511b6605e25786c Mon Sep 17 00:00:00 2001 From: Yuteng Chen Date: Wed, 22 Apr 2026 21:53:52 +0800 Subject: [PATCH 17/17] provide enum scalingMode Signed-off-by: Yuteng Chen --- .../flytekitplugins/ray/models.py | 22 +++++++++---------- .../flytekit-ray/flytekitplugins/ray/task.py | 3 ++- plugins/flytekit-ray/tests/test_ray.py | 4 ++-- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 73ab6f55d7..2ad6b4106a 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -149,9 +149,15 @@ def from_flyte_idl(cls, proto): class AutoscalerOptions(_common.FlyteIdlEntity): + class UpscalingMode(object): + UNSPECIFIED = _ray_pb2.AutoscalerOptions.UpscalingMode.UPSCALING_MODE_UNSPECIFIED + DEFAULT = _ray_pb2.AutoscalerOptions.UpscalingMode.UPSCALING_MODE_DEFAULT + AGGRESSIVE = _ray_pb2.AutoscalerOptions.UpscalingMode.UPSCALING_MODE_AGGRESSIVE + CONSERVATIVE = _ray_pb2.AutoscalerOptions.UpscalingMode.UPSCALING_MODE_CONSERVATIVE + def __init__( self, - upscaling_mode: typing.Optional[str] = None, + upscaling_mode: typing.Optional["AutoscalerOptions.UpscalingMode"] = None, idle_timeout_seconds: typing.Optional[int] = None, image: typing.Optional[str] = None, env: typing.Optional[typing.Dict[str, str]] = None, @@ -164,7 +170,7 @@ def __init__( self._resources = resources @property - def upscaling_mode(self) -> typing.Optional[str]: + def upscaling_mode(self) -> typing.Optional["AutoscalerOptions.UpscalingMode"]: return self._upscaling_mode @property @@ -189,11 +195,7 @@ def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: for key, val in self.env.items(): envs.append(_literals_pb2.KeyValuePair(key=key, value=val)) return _ray_pb2.AutoscalerOptions( - upscaling_mode=_ray_pb2.AutoscalerOptions.UpscalingMode.Value( - "UPSCALING_MODE_" + self.upscaling_mode.upper() - ) - if self.upscaling_mode - else None, + upscaling_mode=self.upscaling_mode, idle_timeout_seconds=self.idle_timeout_seconds, image=self.image, env=envs if envs else None, @@ -203,11 +205,7 @@ def to_flyte_idl(self) -> _ray_pb2.AutoscalerOptions: @classmethod def from_flyte_idl(cls, proto): return cls( - upscaling_mode=_ray_pb2.AutoscalerOptions.UpscalingMode.Name(proto.upscaling_mode) - .removeprefix("UPSCALING_MODE_") - .title() - if proto.upscaling_mode - else None, + upscaling_mode=proto.upscaling_mode, idle_timeout_seconds=proto.idle_timeout_seconds, image=proto.image, env={e.key: e.value for e in proto.env} if proto.env else None, diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index a7fa0cbd07..cac673da21 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -60,7 +60,8 @@ def __post_init__(self): @dataclass class AutoscalerOptionsConfig: - upscaling_mode: Optional[str] = None + UpscalingMode = AutoscalerOptions.UpscalingMode + upscaling_mode: Optional["AutoscalerOptions.UpscalingMode"] = None idle_timeout_seconds: Optional[int] = None env: Optional[Dict[str, str]] = None image: Optional[str] = None diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index aadc795f2b..734aba299c 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -41,7 +41,7 @@ runtime_env={"pip": ["numpy"]}, enable_autoscaling=True, autoscaler_options=AutoscalerOptionsConfig( - upscaling_mode="Conservative", + upscaling_mode=AutoscalerOptionsConfig.UpscalingMode.CONSERVATIVE, idle_timeout_seconds=120, image="rayproject/ray:2.9.0", env={"lKeyA": "lValA"}, @@ -96,7 +96,7 @@ def t1(a: int) -> str: head_group_spec=HeadGroupSpec(k8s_pod=K8sPod.from_pod_template(head_pod_template)), enable_autoscaling=True, autoscaler_options=AutoscalerOptions( - upscaling_mode="Conservative", + upscaling_mode=AutoscalerOptions.UpscalingMode.CONSERVATIVE, idle_timeout_seconds=120, image="rayproject/ray:2.9.0", env={"lKeyA": "lValA"},