diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowDriver.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowDriver.java index 455f8af0abab..6db1d95d8710 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowDriver.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowDriver.java @@ -62,6 +62,7 @@ import io.cdap.cdap.common.lang.Exceptions; import io.cdap.cdap.common.lang.InstantiatorFactory; import io.cdap.cdap.common.lang.PropertyFieldSetter; +import io.cdap.cdap.common.logging.LoggingContext; import io.cdap.cdap.common.logging.LoggingContextAccessor; import io.cdap.cdap.common.namespace.NamespaceQueryAdmin; import io.cdap.cdap.common.service.Retries; @@ -328,29 +329,36 @@ private void executeAction(WorkflowActionNode node, WorkflowToken token) throws ExecutorService executorService = createExecutor(1, executorTerminateLatch, "action-" + node.getNodeId() + "-%d"); + // Capture context for propagation + final LoggingContext parentLoggingContext = LoggingContextAccessor.getLoggingContext(); + try { // Run the action in new thread Future future = executorService.submit(new Callable() { @Override public Void call() throws Exception { - SchedulableProgramType programType = node.getProgram().getProgramType(); - String programName = node.getProgram().getProgramName(); - String prettyProgramType = ProgramType.valueOf(programType.name()).getPrettyName(); - ProgramWorkflowRunner programWorkflowRunner = - workflowProgramRunnerFactory.getProgramWorkflowRunner(programType, token, - node.getNodeId(), nodeStates); - - // this should not happen, since null is only passed in from WorkflowDriver, only when calling configure - if (programWorkflowRunner == null) { - throw new UnsupportedOperationException("Operation not allowed."); - } + try (LoggingContextAccessor.LoggingContextRestorer ignored = + parentLoggingContext != null ? LoggingContextAccessor.setLoggingContext(parentLoggingContext) + : null) { + SchedulableProgramType programType = node.getProgram().getProgramType(); + String programName = node.getProgram().getProgramName(); + String prettyProgramType = ProgramType.valueOf(programType.name()).getPrettyName(); + ProgramWorkflowRunner programWorkflowRunner = + workflowProgramRunnerFactory.getProgramWorkflowRunner(programType, token, + node.getNodeId(), nodeStates); + + // this should not happen, since null is only passed in from WorkflowDriver, only when calling configure + if (programWorkflowRunner == null) { + throw new UnsupportedOperationException("Operation not allowed."); + } - Runnable programRunner = programWorkflowRunner.create(programName); - LOG.info("Starting {} Program '{}' in workflow", prettyProgramType, programName); - programRunner.run(); + Runnable programRunner = programWorkflowRunner.create(programName); + LOG.info("Starting {} Program '{}' in workflow", prettyProgramType, programName); + programRunner.run(); - LOG.info("{} Program '{}' in workflow completed", prettyProgramType, programName); - return null; + LOG.info("{} Program '{}' in workflow completed", prettyProgramType, programName); + return null; + } } }); future.get(); @@ -376,14 +384,22 @@ private void executeFork(final ApplicationSpecification appSpec, WorkflowForkNod CompletionService> completionService = new ExecutorCompletionService<>(executorService); + // Capture the logging context from the parent thread to propagate to fork branches + final LoggingContext parentLoggingContext = LoggingContextAccessor.getLoggingContext(); + try { for (final List branch : fork.getBranches()) { completionService.submit(new Callable>() { @Override public Map.Entry call() throws Exception { - WorkflowToken copiedToken = ((BasicWorkflowToken) token).deepCopy(); - executeAll(branch.iterator(), appSpec, instantiator, classLoader, copiedToken); - return Maps.immutableEntry(branch.toString(), copiedToken); + // Set the logging context on the worker thread + try (LoggingContextAccessor.LoggingContextRestorer ignored = + parentLoggingContext != null ? LoggingContextAccessor.setLoggingContext(parentLoggingContext) + : null) { + WorkflowToken copiedToken = ((BasicWorkflowToken) token).deepCopy(); + executeAll(branch.iterator(), appSpec, instantiator, classLoader, copiedToken); + return Maps.immutableEntry(branch.toString(), copiedToken); + } } }); } @@ -707,7 +723,7 @@ public List get() { * * @param threads number of core threads in the executor * @param terminationLatch a {@link CountDownLatch} that will be counted down when the - * executor terminated + * executor terminated * @param threadNameFormat name format for the {@link ThreadFactory} provided to the executor * @return a new {@link ExecutorService}. */