Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.cdap.cdap.common.lang.Exceptions;
import io.cdap.cdap.common.lang.InstantiatorFactory;
import io.cdap.cdap.common.lang.PropertyFieldSetter;
import io.cdap.cdap.common.logging.LoggingContext;
import io.cdap.cdap.common.logging.LoggingContextAccessor;
import io.cdap.cdap.common.namespace.NamespaceQueryAdmin;
import io.cdap.cdap.common.service.Retries;
Expand Down Expand Up @@ -328,29 +329,36 @@
ExecutorService executorService = createExecutor(1, executorTerminateLatch,
"action-" + node.getNodeId() + "-%d");

// Capture context for propagation
final LoggingContext parentLoggingContext = LoggingContextAccessor.getLoggingContext();

try {
// Run the action in new thread
Future<?> future = executorService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
SchedulableProgramType programType = node.getProgram().getProgramType();
String programName = node.getProgram().getProgramName();
String prettyProgramType = ProgramType.valueOf(programType.name()).getPrettyName();
ProgramWorkflowRunner programWorkflowRunner =
workflowProgramRunnerFactory.getProgramWorkflowRunner(programType, token,
node.getNodeId(), nodeStates);

// this should not happen, since null is only passed in from WorkflowDriver, only when calling configure
if (programWorkflowRunner == null) {
throw new UnsupportedOperationException("Operation not allowed.");
}
try (LoggingContextAccessor.LoggingContextRestorer ignored =
parentLoggingContext != null ? LoggingContextAccessor.setLoggingContext(parentLoggingContext)
: null) {
SchedulableProgramType programType = node.getProgram().getProgramType();
String programName = node.getProgram().getProgramName();
String prettyProgramType = ProgramType.valueOf(programType.name()).getPrettyName();
ProgramWorkflowRunner programWorkflowRunner =
workflowProgramRunnerFactory.getProgramWorkflowRunner(programType, token,
node.getNodeId(), nodeStates);

// this should not happen, since null is only passed in from WorkflowDriver, only when calling configure
if (programWorkflowRunner == null) {
throw new UnsupportedOperationException("Operation not allowed.");
}

Runnable programRunner = programWorkflowRunner.create(programName);
LOG.info("Starting {} Program '{}' in workflow", prettyProgramType, programName);
programRunner.run();
Runnable programRunner = programWorkflowRunner.create(programName);
LOG.info("Starting {} Program '{}' in workflow", prettyProgramType, programName);
programRunner.run();

LOG.info("{} Program '{}' in workflow completed", prettyProgramType, programName);
return null;
LOG.info("{} Program '{}' in workflow completed", prettyProgramType, programName);
return null;
}
Comment thread
riyaa14 marked this conversation as resolved.
}
});
future.get();
Expand All @@ -376,14 +384,22 @@
CompletionService<Map.Entry<String, WorkflowToken>> completionService =
new ExecutorCompletionService<>(executorService);

// Capture the logging context from the parent thread to propagate to fork branches
final LoggingContext parentLoggingContext = LoggingContextAccessor.getLoggingContext();

try {
for (final List<WorkflowNode> branch : fork.getBranches()) {
completionService.submit(new Callable<Map.Entry<String, WorkflowToken>>() {
@Override
public Map.Entry<String, WorkflowToken> call() throws Exception {
WorkflowToken copiedToken = ((BasicWorkflowToken) token).deepCopy();
executeAll(branch.iterator(), appSpec, instantiator, classLoader, copiedToken);
return Maps.immutableEntry(branch.toString(), copiedToken);
// Set the logging context on the worker thread
try (LoggingContextAccessor.LoggingContextRestorer ignored =
parentLoggingContext != null ? LoggingContextAccessor.setLoggingContext(parentLoggingContext)
: null) {
WorkflowToken copiedToken = ((BasicWorkflowToken) token).deepCopy();
executeAll(branch.iterator(), appSpec, instantiator, classLoader, copiedToken);
return Maps.immutableEntry(branch.toString(), copiedToken);
}
Comment thread
riyaa14 marked this conversation as resolved.
}
});
}
Expand Down Expand Up @@ -562,7 +578,7 @@

private DatasetProperties addLocalDatasetProperty(DatasetProperties properties,
boolean keepLocal) {
String dsDescription = properties.getDescription();

Check warning on line 581 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/workflow/WorkflowDriver.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck

Distance between variable 'dsDescription' declaration and its first usage is 9, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).
DatasetProperties.Builder builder = DatasetProperties.builder();
builder.addAll(properties.getProperties());
builder.add(Constants.AppFabric.WORKFLOW_LOCAL_DATASET_PROPERTY, "true");
Expand Down Expand Up @@ -707,7 +723,7 @@
*
* @param threads number of core threads in the executor
* @param terminationLatch a {@link CountDownLatch} that will be counted down when the
* executor terminated
* executor terminated
* @param threadNameFormat name format for the {@link ThreadFactory} provided to the executor
* @return a new {@link ExecutorService}.
*/
Expand Down
Loading