1- from . import argo
2-
1+ import argo .workflows
2+ from cloudharness .utils import dict_merge
3+ from . import argo_service
4+ from argo .workflows .client import V1alpha1Template
35from cloudharness .utils .env import get_cloudharness_variables , get_image_full_tag
46from .utils import WORKFLOW_NAME_VARIABLE_NAME , PodExecutionContext , affinity_spec , is_accounts_present , volume_mount_template , volume_requires_affinity
57
68SERVICE_ACCOUNT = 'argo-workflows'
79
810
9- class Task (argo .ArgoObject ):
11+ class Task (argo_service .ArgoObject ):
1012 """
1113 Abstract interface for a task.
1214 """
1315
14- def __init__ (self , name , resources = {} , volume_mounts = [] , ** env_args ):
16+ def __init__ (self , name , resources = None , volume_mounts = None , retry_limit = 10 , template_overrides : V1alpha1Template = None , ** env_args ):
1517 self .name = name .replace (' ' , '-' ).lower ()
16- self .resources = resources
18+ self .resources = resources or {}
1719 self .__envs = get_cloudharness_variables ()
18- self .volume_mounts = volume_mounts
20+ self .volume_mounts = volume_mounts or []
1921 self .external_volumes = [
2022 v .split (':' )[0 ] for v in self .volume_mounts if volume_requires_affinity (v )]
2123 for k in env_args :
2224 self .__envs [k ] = str (env_args [k ])
25+ self .retry_limit = retry_limit
26+ self .template_overrides = template_overrides .to_dict () if template_overrides else {}
2327
2428 def metadata_spec (self ):
2529 return {
@@ -85,14 +89,14 @@ def volumes_mounts_spec(self):
8589
8690class ContainerizedTask (Task ):
8791
88- def __init__ (self , name , resources = {}, image_pull_policy = 'IfNotPresent' , command = None , ** env_args ):
89- super ().__init__ (name , resources , ** env_args )
92+ def __init__ (self , name , resources = {}, image_pull_policy = 'IfNotPresent' , command = None , retry_limit = 10 , template_overrides : V1alpha1Template = None , ** env_args ):
93+ super ().__init__ (name , resources , retry_limit = retry_limit , template_overrides = template_overrides , ** env_args )
9094 self .image_pull_policy = image_pull_policy
9195 self .command = command
9296
9397 def spec (self ):
9498
95- spec = {
99+ spec = dict_merge ( {
96100 'container' : {
97101 'image' : self .image_name ,
98102 'env' : self .envs ,
@@ -104,9 +108,12 @@ def spec(self):
104108 'metadata' : self .metadata_spec (),
105109 'name' : self .name ,
106110 'outputs' : {},
107- 'affinity' : self .affinity_spec ()
111+ 'affinity' : self .affinity_spec (),
112+ 'retryStrategy' : {
113+ 'limit' : self .retry_limit
114+ }
108115
109- }
116+ }, self . template_overrides , merge_none = False )
110117 if self .command is not None :
111118 spec ['container' ]['command' ] = self .command
112119 return spec
@@ -123,7 +130,7 @@ def __init__(self, name, source, **kwargs):
123130
124131 def spec (self ):
125132
126- return {
133+ return dict_merge ( {
127134 'name' : self .name ,
128135 'affinity' : self .affinity_spec (),
129136 'metadata' : self .metadata_spec (),
@@ -135,18 +142,18 @@ def spec(self):
135142 'volumeMounts' : self .volumes_mounts_spec (),
136143 'command' : [self .command ]
137144 }
138- }
145+ }, self . template_overrides , merge_none = False )
139146
140147 @property
141148 def command (self ):
142149 raise NotImplemented
143150
144151
145152class PythonTask (InlinedTask ):
146- def __init__ (self , name , func ):
153+ def __init__ (self , name , func , ** kwargs ):
147154 import inspect
148155 super ().__init__ (name , (inspect .getsource (
149- func ) + f"\n { func .__name__ } ()" ).strip ())
156+ func ) + f"\n { func .__name__ } ()" ).strip (), ** kwargs )
150157
151158 @property
152159 def image_name (self ):
0 commit comments