1818package org .apache .gobblin .azkaban ;
1919
2020import java .io .IOException ;
21+ import java .time .ZoneOffset ;
22+ import java .time .ZonedDateTime ;
23+ import java .util .Collections ;
2124import java .util .List ;
25+ import java .util .Map ;
2226import java .util .Properties ;
27+ import java .util .UUID ;
2328import java .util .concurrent .TimeoutException ;
2429
30+ import org .apache .commons .codec .digest .DigestUtils ;
31+ import org .apache .commons .lang3 .exception .ExceptionUtils ;
2532import org .apache .hadoop .yarn .conf .YarnConfiguration ;
2633import org .apache .log4j .Level ;
2734import org .apache .log4j .Logger ;
2835
36+ import com .google .common .annotations .VisibleForTesting ;
37+ import com .google .common .collect .Maps ;
2938import com .typesafe .config .Config ;
39+ import com .typesafe .config .ConfigFactory ;
40+ import com .typesafe .config .ConfigValue ;
3041import com .typesafe .config .ConfigValueFactory ;
3142
3243import azkaban .jobExecutor .AbstractJob ;
3344import lombok .Getter ;
3445
46+ import org .apache .gobblin .configuration .ConfigurationKeys ;
47+ import org .apache .gobblin .configuration .DynamicConfigGenerator ;
48+ import org .apache .gobblin .instrumented .Instrumented ;
49+ import org .apache .gobblin .metrics .MetricContext ;
50+ import org .apache .gobblin .metrics .event .EventSubmitter ;
51+ import org .apache .gobblin .metrics .event .TimingEvent ;
52+ import org .apache .gobblin .runtime .DynamicConfigGeneratorFactory ;
53+ import org .apache .gobblin .runtime .app .ApplicationLauncher ;
54+ import org .apache .gobblin .runtime .app .ServiceBasedAppLauncher ;
55+ import org .apache .gobblin .runtime .troubleshooter .Issue ;
56+ import org .apache .gobblin .runtime .troubleshooter .IssueEventBuilder ;
57+ import org .apache .gobblin .runtime .troubleshooter .IssueSeverity ;
3558import org .apache .gobblin .util .AzkabanLauncherUtils ;
3659import org .apache .gobblin .util .ConfigUtils ;
3760import org .apache .gobblin .yarn .GobblinYarnAppLauncher ;
5679public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
5780 private static final Logger LOGGER = Logger .getLogger (AzkabanGobblinYarnAppLauncher .class );
5881
82+ /** Config prefix for the {@link ServiceBasedAppLauncher} used for bootstrap-failure event reporting. */
83+ private static final String APP_LAUNCHER_PREFIX = "gobblinYarnAppLauncher" ;
84+
5985 private final GobblinYarnAppLauncher gobblinYarnAppLauncher ;
86+ private final Properties gobblinProps ;
87+ private final ApplicationLauncher applicationLauncher ;
88+ private final EventSubmitter eventSubmitter ;
6089
6190 @ Getter
6291 protected final YarnConfiguration yarnConfiguration ;
6392
64- public AzkabanGobblinYarnAppLauncher (String jobId , Properties gobblinProps )
65- throws IOException {
93+ public AzkabanGobblinYarnAppLauncher (String jobId , Properties gobblinProps ) throws Exception {
6694 super (jobId , LOGGER );
6795 gobblinProps = AzkabanLauncherUtils .undoPlaceholderConversion (gobblinProps );
6896 addRuntimeProperties (gobblinProps );
97+ this .gobblinProps = gobblinProps ;
6998
70- Config gobblinConfig = ConfigUtils .propertiesToConfig (gobblinProps );
99+ // Populate correct SSL keystore path from the Hadoop token file so Xinfra reporters work in
100+ // the GGW pod (XinfraDynamicConfigGenerator writes the cert to a temp file and sets the path)
101+ Config propsAsConfig = ConfigUtils .propertiesToConfig (gobblinProps );
102+ DynamicConfigGenerator dynamicConfigGenerator =
103+ DynamicConfigGeneratorFactory .createDynamicConfigGenerator (propsAsConfig );
104+ Config dynamicConfig = dynamicConfigGenerator .generateDynamicConfig (propsAsConfig );
105+ for (Map .Entry <String , ConfigValue > entry : dynamicConfig .entrySet ()) {
106+ gobblinProps .put (entry .getKey (), entry .getValue ().unwrapped ().toString ());
107+ }
71108
72- //Suppress logs from classes that emit Yarn application Id that Azkaban uses to kill the application.
73- setLogLevelForClasses ( gobblinConfig );
109+ this . applicationLauncher = buildApplicationLauncher ( gobblinProps );
110+ this . eventSubmitter = buildEventSubmitter ( );
74111
112+ Config gobblinConfig = ConfigUtils .propertiesToConfig (gobblinProps );
113+ setLogLevelForClasses (gobblinConfig );
75114 yarnConfiguration = initYarnConf (gobblinProps );
76-
77115 gobblinConfig = gobblinConfig .withValue (GobblinYarnAppLauncher .GOBBLIN_YARN_APP_LAUNCHER_MODE ,
78116 ConfigValueFactory .fromAnyRef (GobblinYarnAppLauncher .AZKABAN_APP_LAUNCHER_MODE_KEY ));
79117 this .gobblinYarnAppLauncher = getYarnAppLauncher (gobblinConfig );
80118 }
81119
120+ @ VisibleForTesting
121+ AzkabanGobblinYarnAppLauncher (String jobId , Properties gobblinProps ,
122+ GobblinYarnAppLauncher gobblinYarnAppLauncher , ApplicationLauncher applicationLauncher ,
123+ EventSubmitter eventSubmitter ) throws IOException {
124+ super (jobId , LOGGER );
125+ this .gobblinProps = gobblinProps ;
126+ this .gobblinYarnAppLauncher = gobblinYarnAppLauncher ;
127+ this .applicationLauncher = applicationLauncher ;
128+ this .eventSubmitter = eventSubmitter ;
129+ this .yarnConfiguration = new YarnConfiguration ();
130+ }
131+
132+ /**
133+ * Creates and starts a {@link ServiceBasedAppLauncher} scoped for bootstrap-failure event reporting.
134+ * Custom metric reporters that require YARN-container-specific SSL setup are temporarily removed
135+ * so the launcher starts cleanly in GGW pods, then restored for the actual Gobblin job launcher.
136+ */
137+ private ApplicationLauncher buildApplicationLauncher (Properties props ) throws Exception {
138+ String customBuilders = props .containsKey (ConfigurationKeys .METRICS_CUSTOM_BUILDERS )
139+ ? (String ) props .remove (ConfigurationKeys .METRICS_CUSTOM_BUILDERS ) : null ;
140+ try {
141+ Config fullConfig = ConfigUtils .propertiesToConfig (props );
142+ Config appConfig =
143+ ConfigUtils .getConfigOrEmpty (fullConfig , APP_LAUNCHER_PREFIX ).withFallback (fullConfig );
144+ ApplicationLauncher launcher = new ServiceBasedAppLauncher (
145+ ConfigUtils .configToProperties (appConfig ), "GobblinYarnAppLauncher-" + UUID .randomUUID ());
146+ launcher .start ();
147+ return launcher ;
148+ } finally {
149+ if (customBuilders != null ) {
150+ props .put (ConfigurationKeys .METRICS_CUSTOM_BUILDERS , customBuilders );
151+ }
152+ }
153+ }
154+
155+ /**
156+ * Builds an {@link EventSubmitter} from the {@link MetricContext} wired up by
157+ * {@link #buildApplicationLauncher}, following the pattern used by {@code IvyJobLauncherTemporal}.
158+ */
159+ private EventSubmitter buildEventSubmitter () {
160+ MetricContext metricContext =
161+ Instrumented .getMetricContext (ConfigUtils .configToState (ConfigFactory .empty ()), getClass ());
162+ return new EventSubmitter .Builder (metricContext , "org.apache.gobblin.service" ).build ();
163+ }
164+
82165 protected GobblinYarnAppLauncher getYarnAppLauncher (Config gobblinConfig )
83166 throws IOException {
84167 GobblinYarnAppLauncher gobblinYarnAppLauncher = new GobblinYarnAppLauncher (gobblinConfig , this .yarnConfiguration );
@@ -116,7 +199,16 @@ protected void addRuntimeProperties(Properties gobblinProps) {
116199
117200 @ Override
118201 public void run () throws Exception {
119- this .gobblinYarnAppLauncher .launch ();
202+ try {
203+ this .gobblinYarnAppLauncher .launch ();
204+ } catch (Exception e ) {
205+ LOGGER .error ("Failed to launch the Gobblin Yarn application" , e );
206+ submitJobFailedEvent (e );
207+ throw e ;
208+ } finally {
209+ // Triggers MetricsReportingService shutdown which flushes events to Kafka before process exits.
210+ this .applicationLauncher .stop ();
211+ }
120212
121213 Runtime .getRuntime ().addShutdownHook (new Thread () {
122214
@@ -138,4 +230,51 @@ public void run() {
138230 public void cancel () throws Exception {
139231 this .gobblinYarnAppLauncher .stop ();
140232 }
233+
234+ /**
235+ * Emits a JOB_FAILED timing event and an {@link IssueEventBuilder} so GaaS transitions the job
236+ * to FAILED and populates {@code issues[]} with the bootstrap failure root cause.
237+ */
238+ private void submitJobFailedEvent (Exception e ) {
239+ this .eventSubmitter .getTimingEvent (TimingEvent .LauncherTimings .JOB_FAILED ).stop (getFlowMetadata ());
240+
241+ // Emit an IssueEventBuilder so the error details appear in the issues[] array in GaaS
242+ String summary = "YARN AM bootstrap failed: " + ExceptionUtils .getRootCauseMessage (e );
243+ Issue issue = Issue .builder ()
244+ .time (ZonedDateTime .now (ZoneOffset .UTC ))
245+ .severity (IssueSeverity .ERROR )
246+ .code ("T" + DigestUtils .sha256Hex (summary ).substring (0 , 6 ).toUpperCase ())
247+ .summary (summary )
248+ .details (ExceptionUtils .getStackTrace (e ))
249+ .sourceClass (AzkabanGobblinYarnAppLauncher .class .getName ())
250+ .properties (Collections .emptyMap ())
251+ .build ();
252+ IssueEventBuilder issueEventBuilder = new IssueEventBuilder (IssueEventBuilder .JOB_ISSUE );
253+ issueEventBuilder .setIssue (issue );
254+ issueEventBuilder .addMetadata (TimingEvent .FlowEventConstants .FLOW_GROUP_FIELD ,
255+ this .gobblinProps .getProperty (ConfigurationKeys .FLOW_GROUP_KEY , "" ));
256+ issueEventBuilder .addMetadata (TimingEvent .FlowEventConstants .FLOW_NAME_FIELD ,
257+ this .gobblinProps .getProperty (ConfigurationKeys .FLOW_NAME_KEY , "" ));
258+ issueEventBuilder .addMetadata (TimingEvent .FlowEventConstants .FLOW_EXECUTION_ID_FIELD ,
259+ this .gobblinProps .getProperty (ConfigurationKeys .FLOW_EXECUTION_ID_KEY , "" ));
260+ issueEventBuilder .addMetadata (TimingEvent .FlowEventConstants .JOB_NAME_FIELD ,
261+ this .gobblinProps .getProperty (ConfigurationKeys .JOB_NAME_KEY , "" ));
262+ this .eventSubmitter .submit (issueEventBuilder );
263+ }
264+
265+ /** Returns flow/job metadata tags from job properties for timing event metadata. */
266+ private Map <String , String > getFlowMetadata () {
267+ Map <String , String > metadata = Maps .newHashMap ();
268+ metadata .put (TimingEvent .FlowEventConstants .FLOW_GROUP_FIELD ,
269+ this .gobblinProps .getProperty (ConfigurationKeys .FLOW_GROUP_KEY , "" ));
270+ metadata .put (TimingEvent .FlowEventConstants .FLOW_NAME_FIELD ,
271+ this .gobblinProps .getProperty (ConfigurationKeys .FLOW_NAME_KEY , "" ));
272+ metadata .put (TimingEvent .FlowEventConstants .FLOW_EXECUTION_ID_FIELD ,
273+ this .gobblinProps .getProperty (ConfigurationKeys .FLOW_EXECUTION_ID_KEY , "" ));
274+ metadata .put (TimingEvent .FlowEventConstants .JOB_GROUP_FIELD ,
275+ this .gobblinProps .getProperty (ConfigurationKeys .JOB_GROUP_KEY , "" ));
276+ metadata .put (TimingEvent .FlowEventConstants .JOB_NAME_FIELD ,
277+ this .gobblinProps .getProperty (ConfigurationKeys .JOB_NAME_KEY , "" ));
278+ return metadata ;
279+ }
141280}
0 commit comments