Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public CommitStats commit(WUProcessingSpec workSpec) {
JobState jobState = Help.loadJobState(workSpec, fs);

int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running Commit Activity"),
heartBeatExecutor.scheduleAtFixedRate(ExecutorsUtils.safeRunnable(() -> activityExecutionContext.heartbeat("Running Commit Activity")),
heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);

optJobName = Optional.ofNullable(jobState.getJobName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
log.info("Created jobState: {}", jobState.toJsonString(true));

int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running GenerateWorkUnits"),
heartBeatExecutor.scheduleAtFixedRate(ExecutorsUtils.safeRunnable(() -> activityExecutionContext.heartbeat("Running GenerateWorkUnits")),
heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);

Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public int processWorkUnit(WorkUnitClaimCheck wu) {
log.info("{} - loaded; found {} workUnits", correlator, workUnits.size());
JobState jobState = Help.loadJobState(wu, fs);
int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running ProcessWorkUnit Activity"),
heartBeatExecutor.scheduleAtFixedRate(ExecutorsUtils.safeRunnable(() -> activityExecutionContext.heartbeat("Running ProcessWorkUnit Activity")),
heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);
troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
troubleshooter.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,30 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.apache.gobblin.util.executors.MDCPropagatingCallable;
import org.apache.gobblin.util.executors.MDCPropagatingRunnable;
import org.apache.gobblin.util.executors.MDCPropagatingScheduledExecutorService;
import org.slf4j.Logger;
import lombok.extern.slf4j.Slf4j;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.gobblin.util.executors.MDCPropagatingCallable;
import org.apache.gobblin.util.executors.MDCPropagatingExecutorService;
import org.apache.gobblin.util.executors.MDCPropagatingRunnable;
import org.apache.gobblin.util.executors.MDCPropagatingScheduledExecutorService;


/**
* A utility class to use with {@link java.util.concurrent.Executors} in cases such as when creating new thread pools.
*
* @author Yinan Li
*/
@Slf4j
public class ExecutorsUtils {

private static final ThreadFactory DEFAULT_THREAD_FACTORY = newThreadFactory(Optional.<Logger>absent());
Expand Down Expand Up @@ -162,6 +165,27 @@ public static Runnable loggingDecorator(Runnable runnable) {
return new MDCPropagatingRunnable(runnable);
}


/**
* Wraps a {@link Runnable} task with exception handling to ensure that
* any thrown exception does not terminate the thread or scheduled executor.
* This is useful for long-running or recurring tasks where resilience is critical.
*
* @param runnable the task to wrap
* @return a safe {@link Runnable} that logs and suppresses exceptions
*/
public static Runnable safeRunnable(Runnable runnable) {
return () -> {
try {
runnable.run();
} catch (Exception exception) {
// Catch all exceptions to prevent the thread from dying
// and log the exception
log.error("Caught exception in runnable {}", exception.getMessage());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log stacktrace?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging stacktrace will bloat the log messages, only motivation to wrap the thread is to avoid killing of thread.
Added a log line in debug mode

}
};
}

/**
* Creates an {@link Callable<T>} which propagates the MDC
* information across thread boundaries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,26 @@ public String apply(Integer input) {

ExecutorsUtils.parallelize(nums, sleepAndMultiply, 2, 1, Optional.<Logger> absent());
}

@Test
public void testSafeRunnableRunsSuccessfully() {
Runnable mockRunnable = Mockito.mock(Runnable.class);
ExecutorsUtils.safeRunnable(mockRunnable).run();
Mockito.verify(mockRunnable, Mockito.times(1)).run();
}

@Test
public void testSafeRunnableHandlesException() {
Runnable mockRunnable = Mockito.mock(Runnable.class);
Mockito.doThrow(new RuntimeException("Test exception")).when(mockRunnable).run();
Runnable safeRunnable = ExecutorsUtils.safeRunnable(mockRunnable);
try {
safeRunnable.run();
safeRunnable.run();
Mockito.verify(mockRunnable, Mockito.times(2)).run();
} catch (Exception e) {
Assert.fail("Exception should not be thrown from safeRunnable");
}
}

}
Loading