From b877a0af4e49a05eef1cfb046db037bea024f6ca Mon Sep 17 00:00:00 2001 From: Dheeraj Kholia Date: Thu, 22 Jan 2026 12:07:37 +0530 Subject: [PATCH 1/2] Upgrading guava version to 32.0.0-jre --- pom.xml | 6 +- .../AbstractExecutionServiceController.java | 65 +++++++++----- .../internal/AbstractTwillController.java | 9 +- .../twill/internal/AbstractTwillService.java | 24 ++--- .../internal/AbstractZKServiceController.java | 2 +- .../twill/internal/CompositeService.java | 4 +- .../twill/internal/ElectionRegistry.java | 6 +- .../twill/internal/ListenerExecutor.java | 2 +- .../internal/ServiceListenerAdapter.java | 2 +- .../org/apache/twill/internal/Services.java | 50 +++++++++-- .../internal/TwillContainerLauncher.java | 8 +- .../org/apache/twill/internal/ZKMessages.java | 5 +- .../twill/internal/json/ArgumentsCodec.java | 14 +-- .../kafka/client/ZKKafkaClientService.java | 4 +- .../twill/internal/logging/KafkaAppender.java | 4 +- .../twill/internal/state/SimpleMessage.java | 3 +- .../apache/twill/kafka/client/BrokerInfo.java | 3 +- .../twill/kafka/client/TopicPartition.java | 3 +- .../twill/internal/CompositeServiceTest.java | 8 +- .../apache/twill/internal/ControllerTest.java | 34 ++++---- .../apache/twill/internal/ServicesTest.java | 8 +- .../utils/ApplicationBundlerTest.java | 4 +- .../apache/twill/kafka/client/KafkaTest.java | 40 ++++----- .../discovery/ZKDiscoveryServiceTest.java | 14 +-- .../internal/yarn/Hadoop21YarnAMClient.java | 4 +- .../apache/twill/internal/ServiceMain.java | 4 +- .../appmaster/ApplicationMasterMain.java | 14 +-- .../appmaster/ApplicationMasterService.java | 52 +++++------ .../appmaster/RunnableProcessLauncher.java | 3 +- .../internal/appmaster/RunningContainers.java | 6 +- .../container/TwillContainerMain.java | 7 +- .../twill/yarn/YarnTwillController.java | 8 +- .../apache/twill/yarn/YarnTwillPreparer.java | 17 ++-- .../twill/yarn/YarnTwillRunnerService.java | 14 +-- .../org/apache/twill/yarn/BaseYarnTest.java | 4 +- .../apache/twill/yarn/EchoServerTestRun.java | 8 +- .../twill/yarn/EventHandlerTestRun.java | 4 +- .../twill/yarn/LogLevelChangeTestRun.java | 4 +- .../apache/twill/yarn/LogLevelTestRun.java | 4 +- .../twill/yarn/ResourceReportTestRun.java | 4 +- .../twill/yarn/RestartRunnableTestRun.java | 8 +- .../twill/yarn/SessionExpireTestRun.java | 4 +- .../org/apache/twill/yarn/TwillTester.java | 4 +- .../zookeeper/DefaultZKClientService.java | 45 +++++++--- .../zookeeper/FailureRetryZKClient.java | 20 +++-- .../internal/zookeeper/InMemoryZKServer.java | 52 ++++++++--- .../internal/zookeeper/LeaderElection.java | 4 +- .../internal/zookeeper/NamespaceZKClient.java | 2 +- .../zookeeper/ReentrantDistributedLock.java | 26 +++--- .../zookeeper/RewatchOnExpireWatcher.java | 7 +- .../zookeeper/RewatchOnExpireZKClient.java | 7 +- .../zookeeper/ForwardingZKClientService.java | 47 ++++++---- .../apache/twill/zookeeper/ZKOperations.java | 7 +- .../zookeeper/LeaderElectionTest.java | 56 ++++++------ .../ReentrantDistributedLockTest.java | 34 ++++---- .../apache/twill/zookeeper/ZKClientTest.java | 87 ++++++++++--------- .../twill/zookeeper/ZKOperationsTest.java | 8 +- 57 files changed, 516 insertions(+), 381 deletions(-) diff --git a/pom.xml b/pom.xml index d44cbc29..f6cc7169 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,7 @@ true 1.7.30 1.2.11 - 13.0.1 + 32.0.0-jre 2.2.4 2.0.1 4.1.75.Final @@ -222,7 +222,7 @@ - + diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java index a00ec6ee..e7e053e7 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; @@ -93,9 +94,10 @@ public Future terminate(long gracefulTimeout, TimeU } terminationTimeoutMillis.compareAndSet(-1L, timeout); - stop(); + stopAsync(); return Futures.transform(terminationFuture, - (Function) input -> AbstractExecutionServiceController.this); + (Function) input -> AbstractExecutionServiceController.this, + MoreExecutors.directExecutor()); } @Nullable @@ -130,13 +132,22 @@ public void terminated(State from) { } @Override - public void awaitTerminated() throws ExecutionException { - Uninterruptibles.getUninterruptibly(terminationFuture); + public void awaitTerminated() { + try { + Uninterruptibles.getUninterruptibly(terminationFuture); + } catch (ExecutionException e) { + throw new IllegalStateException(e.getCause()); + } } @Override - public void awaitTerminated(long timeout, TimeUnit timeoutUnit) throws TimeoutException, ExecutionException { - Uninterruptibles.getUninterruptibly(terminationFuture, timeout, timeoutUnit); + public void awaitTerminated(long timeout, TimeUnit timeoutUnit) throws TimeoutException { + try { + Uninterruptibles.getUninterruptibly(terminationFuture, timeout, timeoutUnit); + } catch (ExecutionException e) { + // Consuming ExecutionException to match interface or rethrow runtime + throw new IllegalStateException(e.getCause()); + } } public final void addListener(Listener listener, Executor executor) { @@ -144,41 +155,49 @@ public final void addListener(Listener listener, Executor executor) { } @Override - public final ListenableFuture start() { + public final Service startAsync() { serviceDelegate.addListener(listenerExecutors, Threads.SAME_THREAD_EXECUTOR); - return serviceDelegate.start(); + return serviceDelegate.startAsync(); } @Override - public final State startAndWait() { - return Futures.getUnchecked(start()); + public final Service stopAsync() { + serviceDelegate.stopAsync(); + return this; } @Override - public final boolean isRunning() { - return serviceDelegate.isRunning(); + public final void awaitRunning() { + serviceDelegate.awaitRunning(); } @Override - public final State state() { - return serviceDelegate.state(); + public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + serviceDelegate.awaitRunning(timeout, unit); } @Override - public final State stopAndWait() { - return Futures.getUnchecked(stop()); + public final Throwable failureCause() { + return serviceDelegate.failureCause(); } @Override - public final ListenableFuture stop() { - return serviceDelegate.stop(); + public final boolean isRunning() { + return serviceDelegate.isRunning(); } - protected Executor executor(final State state) { + @Override + public final State state() { + return serviceDelegate.state(); + } + + + + protected Executor executor() { return new Executor() { @Override public void execute(Runnable command) { - Thread t = new Thread(command, getClass().getSimpleName() + " " + state); + Thread t = new Thread(command, getClass().getSimpleName() + " " + state()); t.setDaemon(true); t.start(); } @@ -213,15 +232,15 @@ protected void shutDown() throws Exception { } @Override - protected Executor executor(State state) { - return AbstractExecutionServiceController.this.executor(state); + protected Executor executor() { + return AbstractExecutionServiceController.this.executor(); } } /** * Inner class for dispatching listener call back to a list of listeners. */ - private static final class ListenerExecutors implements Listener { + private static final class ListenerExecutors extends Listener { private interface Callback { void call(Listener listener); diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java index 0ff2fc8c..68fc83ae 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java @@ -38,6 +38,7 @@ import org.apache.twill.api.logging.LogHandler; import org.apache.twill.api.logging.LogThrowable; import org.apache.twill.common.Cancellable; +import org.apache.twill.common.Threads; import org.apache.twill.discovery.ServiceDiscovered; import org.apache.twill.discovery.ZKDiscoveryService; import org.apache.twill.internal.json.LogEntryDecoder; @@ -102,7 +103,7 @@ public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, b @Override protected synchronized void doStartUp() { if (kafkaClient != null && !logHandlers.isEmpty()) { - kafkaClient.startAndWait(); + kafkaClient.startAsync().awaitRunning(); logCancellable = kafkaClient.getConsumer().prepare() .addFromBeginning(Constants.LOG_TOPIC, 0) .consume(new LogMessageCallback(logHandlers)); @@ -119,7 +120,7 @@ protected synchronized void doShutDown() { } if (kafkaClient != null) { // Safe to call stop no matter what state the KafkaClientService is in. - kafkaClient.stopAndWait(); + kafkaClient.stopAsync().awaitTerminated(); } } @@ -133,7 +134,7 @@ public final synchronized void addLogHandler(LogHandler handler) { logHandlers.add(handler); if (logHandlers.size() == 1) { - kafkaClient.startAndWait(); + kafkaClient.startAsync().awaitRunning(); logCancellable = kafkaClient.getConsumer().prepare() .addFromBeginning(Constants.LOG_TOPIC, 0) .consume(new LogMessageCallback(logHandlers)); @@ -198,7 +199,7 @@ public ListenableFuture restartInstances(final String runnable, Set input) { return runnable; } - }); + }, Threads.SAME_THREAD_EXECUTOR); } @Override diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java index 31adabce..ceb78ae7 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -144,7 +145,7 @@ protected Gson getLiveNodeGson() { @Override public ListenableFuture onReceived(String messageId, Message message) { LOG.info("Message received: {}", message); - return Futures.immediateCheckedFuture(messageId); + return Futures.immediateFuture(messageId); } @Override @@ -164,10 +165,10 @@ protected final void startUp() throws Exception { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.Expired) { - LOG.warn("ZK Session expired for service {} with runId {}.", getServiceName(), runId.getId()); + LOG.warn("ZK Session expired for service {} with runId {}.", serviceName(), runId.getId()); expired = true; } else if (event.getState() == Event.KeeperState.SyncConnected && expired) { - LOG.info("Reconnected after expiration for service {} with runId {}", getServiceName(), runId.getId()); + LOG.info("Reconnected after expiration for service {} with runId {}", serviceName(), runId.getId()); expired = false; logIfFailed(createLiveNode()); } @@ -204,7 +205,7 @@ protected final void shutDown() throws Exception { } finally { // Given at most 5 seconds to cleanup ZK nodes removeLiveNode().get(5, TimeUnit.SECONDS); - LOG.info("Service {} with runId {} shutdown completed", getServiceName(), runId.getId()); + LOG.info("Service {} with runId {} shutdown completed", serviceName(), runId.getId()); } } @@ -327,19 +328,20 @@ private boolean handleStopMessage(Message message, final Runnable messageRemover Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS); terminationTimeoutMillis.compareAndSet(-1L, timeoutMillis); - // Stop this service. - Futures.addCallback(stop(), new FutureCallback() { + addListener(new Listener() { @Override - public void onSuccess(State result) { + public void terminated(State from) { messageRemover.run(); } @Override - public void onFailure(Throwable t) { - LOG.error("Stop service failed upon STOP command", t); + public void failed(State from, Throwable failure) { + LOG.error("Stop service failed upon STOP command", failure); messageRemover.run(); } - }, Threads.SAME_THREAD_EXECUTOR); + }, MoreExecutors.directExecutor()); + // Stop this service. + stopAsync(); return true; } @@ -391,7 +393,7 @@ public void onSuccess(T result) { @Override public void onFailure(Throwable t) { - LOG.error("Operation failed for service {} with runId {}", getServiceName(), runId, t); + LOG.error("Operation failed for service {} with runId {}", serviceName(), runId, t); } }, Threads.SAME_THREAD_EXECUTOR); } diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java index 214fbb86..74066e56 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java @@ -154,7 +154,7 @@ protected synchronized void forceShutDown() { // In force shutdown, don't send message. stopMessageFuture = Futures.immediateFuture(State.TERMINATED); } - stop(); + stopAsync(); } diff --git a/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java b/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java index 38659f68..659dc3c7 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java @@ -53,7 +53,7 @@ protected void startUp() throws Exception { for (Service service : services) { try { - service.startAndWait(); + service.startAsync().awaitRunning(); } catch (UncheckedExecutionException e) { failureCause = e.getCause(); break; @@ -88,7 +88,7 @@ private void stopAll() throws Exception { Service service = itor.next(); try { if (service.isRunning() || service.state() == State.STARTING) { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } } catch (UncheckedExecutionException e) { // Just catch as we want all services stopped diff --git a/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java b/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java index 9153fe62..03dc6bbc 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ElectionRegistry.java @@ -46,7 +46,7 @@ public ElectionRegistry(ZKClient zkClient) { */ public Cancellable register(String name, ElectionHandler handler) { LeaderElection election = new LeaderElection(zkClient, name, handler); - election.start(); + election.startAsync(); registry.put(name, election); return new CancellableElection(name, election); } @@ -56,7 +56,7 @@ public Cancellable register(String name, ElectionHandler handler) { */ public void shutdown() { for (LeaderElection election : registry.values()) { - election.stop(); + election.stopAsync(); } } @@ -71,7 +71,7 @@ public CancellableElection(String name, LeaderElection election) { @Override public void cancel() { - election.stop(); + election.stopAsync(); registry.remove(name, election); } } diff --git a/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java b/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java index 9d3e1560..0e93016c 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ListenerExecutor.java @@ -29,7 +29,7 @@ * Wrapper for {@link Service.Listener} to have callback executed on a given {@link Executor}. * Also make sure each method is called at most once. */ -final class ListenerExecutor implements Service.Listener { +final class ListenerExecutor extends Service.Listener { private static final Logger LOG = LoggerFactory.getLogger(ListenerExecutor.class); diff --git a/twill-core/src/main/java/org/apache/twill/internal/ServiceListenerAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/ServiceListenerAdapter.java index 4a34abf1..9d8c3142 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ServiceListenerAdapter.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ServiceListenerAdapter.java @@ -22,7 +22,7 @@ /** * An adapter for implementing {@link Service.Listener} with all method default to no-op. */ -public abstract class ServiceListenerAdapter implements Service.Listener { +public abstract class ServiceListenerAdapter extends Service.Listener { @Override public void starting() { // No-op diff --git a/twill-core/src/main/java/org/apache/twill/internal/Services.java b/twill-core/src/main/java/org/apache/twill/internal/Services.java index e431be98..58556c4d 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/Services.java +++ b/twill-core/src/main/java/org/apache/twill/internal/Services.java @@ -20,9 +20,9 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.SettableFuture; -import org.apache.twill.common.Threads; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -82,7 +82,7 @@ public void terminated(Service.State from) { public void failed(Service.State from, Throwable failure) { resultFuture.setException(failure); } - }, Threads.SAME_THREAD_EXECUTOR); + }, MoreExecutors.directExecutor()); Service.State state = service.state(); if (state == Service.State.TERMINATED) { @@ -103,12 +103,50 @@ private static ListenableFuture>> doChain(b SettableFuture>> resultFuture = SettableFuture.create(); List> result = Lists.newArrayListWithCapacity(moreServices.length + 1); - ListenableFuture future = doStart ? firstService.start() : firstService.stop(); + // Create a future for the first service action + ListenableFuture future = createServiceFuture(firstService, doStart); future.addListener(createChainListener(future, moreServices, new AtomicInteger(0), result, resultFuture, doStart), - Threads.SAME_THREAD_EXECUTOR); + MoreExecutors.directExecutor()); return resultFuture; } + /** + * Helper method to start/stop a service and return a Future representing that action's completion. + * Replaces the old service.start() / service.stop() behavior. + */ + private static ListenableFuture createServiceFuture(Service service, boolean doStart) { + final SettableFuture future = SettableFuture.create(); + + // Add listener to capture completion state + service.addListener(new ServiceListenerAdapter() { + @Override + public void running() { + if (doStart) { + future.set(Service.State.RUNNING); + } + } + + @Override + public void terminated(Service.State from) { + if (!doStart) { + future.set(Service.State.TERMINATED); + } + } + + @Override + public void failed(Service.State from, Throwable failure) { + future.setException(failure); + } + }, MoreExecutors.directExecutor()); + // Trigger action + if (doStart) { + service.startAsync(); + } else { + service.stopAsync(); + } + return future; + } + /** * Returns a {@link Runnable} that can be used as a {@link ListenableFuture} listener to trigger * further service action or completing the result future. Used by @@ -129,9 +167,9 @@ public void run() { resultFuture.set(result); return; } - ListenableFuture actionFuture = doStart ? services[nextIdx].start() : services[nextIdx].stop(); + ListenableFuture actionFuture = createServiceFuture(services[nextIdx], doStart); actionFuture.addListener(createChainListener(actionFuture, services, idx, result, resultFuture, doStart), - Threads.SAME_THREAD_EXECUTOR); + MoreExecutors.directExecutor()); } }; } diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java index 9bcbdf69..fdd62ce8 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java @@ -167,7 +167,7 @@ public TwillContainerController start(RunId runId, int instanceId, Class main TwillContainerControllerImpl controller = new TwillContainerControllerImpl(zkClient, runId, runtimeSpec.getName(), instanceId, processController); - controller.start(); + controller.startAsync(); return controller; } @@ -292,9 +292,9 @@ public int getInstanceId() { } private void killAndWait(long maxWaitSecs) { - Stopwatch watch = new Stopwatch(); + Stopwatch watch = Stopwatch.createStarted(); watch.start(); - while (watch.elapsedTime(TimeUnit.SECONDS) < maxWaitSecs) { + while (watch.elapsed(TimeUnit.SECONDS) < maxWaitSecs) { // Kill the application try { kill(); @@ -312,7 +312,7 @@ private void killAndWait(long maxWaitSecs) { // Timeout reached, runnable has not stopped LOG.error("Failed to kill runnable {}, instance {} after {} seconds", runnable, instanceId, - watch.elapsedTime(TimeUnit.SECONDS)); + watch.elapsed(TimeUnit.SECONDS)); // TODO: should we throw exception here? } } diff --git a/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java b/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java index cfcc7e7f..5a18126f 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ZKMessages.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import org.apache.twill.common.Threads; import org.apache.twill.internal.state.Message; import org.apache.twill.internal.state.MessageCodec; import org.apache.twill.zookeeper.ZKClient; @@ -79,14 +80,14 @@ public void onSuccess(String result) { public void onFailure(Throwable t) { completion.setException(t); } - }); + }, Threads.SAME_THREAD_EXECUTOR); } @Override public void onFailure(Throwable t) { completion.setException(t); } - }); + }, Threads.SAME_THREAD_EXECUTOR); } private ZKMessages() { diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java index 341d3ee9..edf15d4a 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java +++ b/twill-core/src/main/java/org/apache/twill/internal/json/ArgumentsCodec.java @@ -18,8 +18,6 @@ package org.apache.twill.internal.json; import com.google.common.collect.ImmutableMultimap; -import com.google.common.io.InputSupplier; -import com.google.common.io.OutputSupplier; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -48,17 +46,13 @@ public final class ArgumentsCodec implements JsonSerializer, JsonDese private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Arguments.class, new ArgumentsCodec()) .create(); - public static void encode(Arguments arguments, OutputSupplier writerSupplier) throws IOException { - try (Writer writer = writerSupplier.getOutput()) { - GSON.toJson(arguments, writer); - } + public static void encode(Arguments arguments, Writer writer) throws IOException { + GSON.toJson(arguments, writer); } - public static Arguments decode(InputSupplier readerSupplier) throws IOException { - try (Reader reader = readerSupplier.getInput()) { - return GSON.fromJson(reader, Arguments.class); - } + public static Arguments decode(Reader reader) throws IOException { + return GSON.fromJson(reader, Arguments.class); } @Override diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java index a0d93ec0..a712e3a1 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKKafkaClientService.java @@ -98,7 +98,7 @@ protected void startUp() throws Exception { scheduler.scheduleAtFixedRate(this, PUBLISHER_CLEANUP_SECONDS, PUBLISHER_CLEANUP_SECONDS, TimeUnit.SECONDS); // Start broker service to get auto-updated brokers information. - brokerService.startAndWait(); + brokerService.startAsync().awaitRunning(); } @Override @@ -110,7 +110,7 @@ protected void shutDown() throws Exception { } consumer.stop(); - brokerService.stopAndWait(); + brokerService.stopAsync().awaitTerminated(); LOG.info("KafkaClientService stopped"); } } diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java index 493c4cae..5404ffec 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java +++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java @@ -218,7 +218,7 @@ private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutExcept } try { - Stopwatch stopwatch = new Stopwatch(); + Stopwatch stopwatch = Stopwatch.createStarted(); stopwatch.start(); long publishTimeout = timeout; @@ -230,7 +230,7 @@ private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutExcept } catch (ExecutionException e) { addError("Failed to publish logs to Kafka.", e); TimeUnit.NANOSECONDS.sleep(backOffTime); - publishTimeout -= stopwatch.elapsedTime(timeoutUnit); + publishTimeout -= stopwatch.elapsed(timeoutUnit); stopwatch.reset(); stopwatch.start(); } diff --git a/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java b/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java index 0465d467..9775f53d 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java +++ b/twill-core/src/main/java/org/apache/twill/internal/state/SimpleMessage.java @@ -17,6 +17,7 @@ */ package org.apache.twill.internal.state; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import org.apache.twill.api.Command; @@ -59,7 +60,7 @@ public Command getCommand() { @Override public String toString() { - return Objects.toStringHelper(Message.class) + return MoreObjects.toStringHelper(Message.class) .add("type", type) .add("scope", scope) .add("runnable", runnableName) diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/BrokerInfo.java b/twill-core/src/main/java/org/apache/twill/kafka/client/BrokerInfo.java index e659ab74..bab45427 100644 --- a/twill-core/src/main/java/org/apache/twill/kafka/client/BrokerInfo.java +++ b/twill-core/src/main/java/org/apache/twill/kafka/client/BrokerInfo.java @@ -17,6 +17,7 @@ */ package org.apache.twill.kafka.client; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; /** @@ -57,7 +58,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(BrokerInfo.class) + return MoreObjects.toStringHelper(BrokerInfo.class) .add("host", host) .add("port", port) .toString(); diff --git a/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java b/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java index 87040bee..85facda3 100644 --- a/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java +++ b/twill-core/src/main/java/org/apache/twill/kafka/client/TopicPartition.java @@ -17,6 +17,7 @@ */ package org.apache.twill.kafka.client; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; /** @@ -62,7 +63,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("topic", topic) .add("partition", partition) .toString(); diff --git a/twill-core/src/test/java/org/apache/twill/internal/CompositeServiceTest.java b/twill-core/src/test/java/org/apache/twill/internal/CompositeServiceTest.java index 34f866c9..ec75fbe1 100644 --- a/twill-core/src/test/java/org/apache/twill/internal/CompositeServiceTest.java +++ b/twill-core/src/test/java/org/apache/twill/internal/CompositeServiceTest.java @@ -49,7 +49,7 @@ public void testOrder() throws InterruptedException, ExecutionException, Timeout } Service service = new CompositeService(services); - service.start().get(5, TimeUnit.SECONDS); + service.startAsync().awaitRunning(5, TimeUnit.SECONDS); // There should be 10 permits after all 10 services started Assert.assertTrue(semaphore.tryAcquire(10, 5, TimeUnit.SECONDS)); @@ -59,7 +59,7 @@ public void testOrder() throws InterruptedException, ExecutionException, Timeout // Release 10 permits for the stop sequence to start semaphore.release(10); - service.stop().get(5, TimeUnit.SECONDS); + service.stopAsync().awaitTerminated(5, TimeUnit.SECONDS); // There should be no permit left after all 10 services stopped Assert.assertFalse(semaphore.tryAcquire(10)); @@ -80,9 +80,9 @@ public void testErrorStart() throws InterruptedException { Service service = new CompositeService(services); try { - service.start().get(); + service.startAsync().awaitRunning(); Assert.fail(); - } catch (ExecutionException e) { + } catch (IllegalStateException e) { // Expected } diff --git a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java index 1a5b7785..f639f6e9 100644 --- a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java +++ b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java @@ -50,17 +50,17 @@ public class ControllerTest { @Test public void testController() throws ExecutionException, InterruptedException, TimeoutException { InMemoryZKServer zkServer = InMemoryZKServer.builder().build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); LOG.info("ZKServer: " + zkServer.getConnectionStr()); try { RunId runId = RunIds.generate(); ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClientService.startAndWait(); + zkClientService.startAsync().awaitRunning(); Service service = createService(zkClientService, runId); - service.startAndWait(); + service.startAsync().awaitRunning(); TwillController controller = getController(zkClientService, "testController", runId); controller.sendCommand(Command.Builder.of("test").build()).get(2, TimeUnit.SECONDS); @@ -76,10 +76,10 @@ public void terminated(Service.State from) { Assert.assertTrue(service.state() == Service.State.TERMINATED || terminateLatch.await(2, TimeUnit.SECONDS)); - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @@ -87,13 +87,13 @@ public void terminated(Service.State from) { @Test public void testControllerBefore() throws InterruptedException, ExecutionException, TimeoutException { InMemoryZKServer zkServer = InMemoryZKServer.builder().build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); LOG.info("ZKServer: " + zkServer.getConnectionStr()); try { RunId runId = RunIds.generate(); ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClientService.startAndWait(); + zkClientService.startAsync().awaitRunning(); final CountDownLatch runLatch = new CountDownLatch(1); TwillController controller = getController(zkClientService, "testControllerBefore", runId); @@ -105,7 +105,7 @@ public void run() { }, Threads.SAME_THREAD_EXECUTOR); Service service = createService(zkClientService, runId); - service.start(); + service.startAsync(); Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS)); @@ -116,11 +116,11 @@ public void run() { // Expected } - service.stop(); + service.stopAsync(); controller.awaitTerminated(120, TimeUnit.SECONDS); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @@ -128,16 +128,16 @@ public void run() { @Test public void testControllerListener() throws InterruptedException { InMemoryZKServer zkServer = InMemoryZKServer.builder().build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); LOG.info("ZKServer: " + zkServer.getConnectionStr()); try { RunId runId = RunIds.generate(); ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClientService.startAndWait(); + zkClientService.startAsync().awaitRunning(); Service service = createService(zkClientService, runId); - service.startAndWait(); + service.startAsync().awaitRunning(); final CountDownLatch runLatch = new CountDownLatch(1); TwillController controller = getController(zkClientService, "testControllerListener", runId); @@ -150,11 +150,11 @@ public void run() { Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS)); - service.stopAndWait(); + service.stopAsync().awaitTerminated(); - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @@ -212,7 +212,7 @@ public ResourceReport getResourceReport() { return null; } }; - controller.startAndWait(); + controller.startAsync().awaitTerminated(); return controller; } } diff --git a/twill-core/src/test/java/org/apache/twill/internal/ServicesTest.java b/twill-core/src/test/java/org/apache/twill/internal/ServicesTest.java index 058fb439..6539afe1 100644 --- a/twill-core/src/test/java/org/apache/twill/internal/ServicesTest.java +++ b/twill-core/src/test/java/org/apache/twill/internal/ServicesTest.java @@ -54,8 +54,8 @@ public void testCompletion() throws ExecutionException, InterruptedException { Service service = new DummyService("s1", new AtomicBoolean()); ListenableFuture completion = Services.getCompletionFuture(service); - service.start(); - service.stop(); + service.startAsync(); + service.stopAsync(); completion.get(); @@ -63,9 +63,9 @@ public void testCompletion() throws ExecutionException, InterruptedException { service = new DummyService("s2", transiting); completion = Services.getCompletionFuture(service); - service.startAndWait(); + service.startAsync().awaitRunning(); transiting.set(true); - service.stop(); + service.stopAsync(); try { completion.get(); diff --git a/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java index 4609ebd9..1bdd5cf5 100644 --- a/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java +++ b/twill-core/src/test/java/org/apache/twill/internal/utils/ApplicationBundlerTest.java @@ -132,7 +132,9 @@ private void unjar(File jarFile, File targetDir) throws IOException { target.mkdirs(); } else { target.getParentFile().mkdirs(); - ByteStreams.copy(jarInput, Files.newOutputStreamSupplier(target)); + try (FileOutputStream output = new FileOutputStream(target)) { + ByteStreams.copy(jarInput, output); + } } jarEntry = jarInput.getNextJarEntry(); diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java index e7e7e7ca..b689b9a1 100644 --- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java +++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java @@ -65,11 +65,11 @@ public class KafkaTest { @BeforeClass public static void init() throws Exception { zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); // Extract the kafka.tgz and start the kafka server kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr())); - kafkaServer.startAndWait(); + kafkaServer.startAsync().awaitRunning(); zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); @@ -80,8 +80,8 @@ public static void init() throws Exception { @AfterClass public static void finish() throws Exception { Services.chainStop(kafkaClient, zkClientService).get(); - kafkaServer.stopAndWait(); - zkServer.stopAndWait(); + kafkaServer.stopAsync().awaitTerminated(); + zkServer.stopAsync().awaitTerminated(); } @Test @@ -91,15 +91,15 @@ public void testKafkaClientReconnect() throws Exception { EmbeddedKafkaServer server = new EmbeddedKafkaServer(kafkaServerConfig); ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/backoff").build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try { zkClient.create("/", null, CreateMode.PERSISTENT).get(); ZKKafkaClientService kafkaClient = new ZKKafkaClientService(zkClient); - kafkaClient.startAndWait(); + kafkaClient.startAsync().awaitRunning(); try { - server.startAndWait(); + server.startAsync().awaitRunning(); try { // Publish a messages createPublishThread(kafkaClient, topic, Compression.NONE, "First message", 1).start(); @@ -128,12 +128,12 @@ public void finished() { Assert.assertEquals("0 First message", queue.poll(60, TimeUnit.SECONDS)); // Shutdown the server - server.stopAndWait(); + server.stopAsync().awaitTerminated(); // Start the server again. // Needs to create a new instance with the same config since guava service cannot be restarted server = new EmbeddedKafkaServer(kafkaServerConfig); - server.startAndWait(); + server.startAsync().awaitRunning(); // Wait a little while to make sure changes is reflected in broker service TimeUnit.SECONDS.sleep(3); @@ -146,13 +146,13 @@ public void finished() { cancel.cancel(); } finally { - kafkaClient.stopAndWait(); + kafkaClient.stopAsync().awaitTerminated(); } } finally { - server.stopAndWait(); + server.stopAsync().awaitTerminated(); } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -247,17 +247,17 @@ public void testBrokerChange() throws Exception { // Create a new namespace in ZK for Kafka server for this test case String connectionStr = zkServer.getConnectionStr() + "/broker_change"; ZKClientService zkClient = ZKClientService.Builder.of(connectionStr).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); zkClient.create("/", null, CreateMode.PERSISTENT).get(); // Start a new kafka server File logDir = TMP_FOLDER.newFolder(); EmbeddedKafkaServer server = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir)); - server.startAndWait(); + server.startAsync().awaitRunning(); // Start a Kafka client KafkaClientService kafkaClient = new ZKKafkaClientService(zkClient); - kafkaClient.startAndWait(); + kafkaClient.startAsync().awaitRunning(); // Attach a consumer final BlockingQueue consumedMessages = Queues.newLinkedBlockingQueue(); @@ -288,9 +288,9 @@ public void finished() { Assert.assertEquals("Message 0", consumedMessages.poll(5, TimeUnit.SECONDS)); // Now shutdown and restart the server on different port - server.stopAndWait(); + server.stopAsync().awaitTerminated(); server = new EmbeddedKafkaServer(generateKafkaConfig(connectionStr, logDir)); - server.startAndWait(); + server.startAsync().awaitRunning(); // Wait a little while to make sure changes is reflected in broker service TimeUnit.SECONDS.sleep(3); @@ -299,9 +299,9 @@ public void finished() { publisher.prepare("test").add(Charsets.UTF_8.encode("Message 1"), 0).send().get(); Assert.assertEquals("Message 1", consumedMessages.poll(5, TimeUnit.SECONDS)); - kafkaClient.stopAndWait(); - zkClient.stopAndWait(); - server.stopAndWait(); + kafkaClient.stopAsync().awaitTerminated(); + zkClient.stopAsync().awaitTerminated(); + server.stopAsync().awaitTerminated(); } private Thread createPublishThread(final KafkaClient kafkaClient, final String topic, diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java index dcf39351..5fd0bf07 100644 --- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java +++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java @@ -47,20 +47,20 @@ public class ZKDiscoveryServiceTest extends DiscoveryServiceTestBase { @BeforeClass public static void beforeClass() { zkServer = InMemoryZKServer.builder().setTickTime(100000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); zkClient = ZKClientServices.delegate( ZKClients.retryOnFailure( ZKClients.reWatchOnExpire( ZKClientService.Builder.of(zkServer.getConnectionStr()).build()), RetryStrategies.fixDelay(1, TimeUnit.SECONDS))); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); } @AfterClass public static void afterClass() { - zkClient.stopAndWait(); - zkServer.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); + zkServer.stopAsync().awaitTerminated(); } @Test (timeout = 30000) @@ -87,7 +87,7 @@ public void testDoubleRegister() throws Exception { ZKClients.reWatchOnExpire( ZKClientService.Builder.of(zkServer.getConnectionStr()).build()), RetryStrategies.fixDelay(1, TimeUnit.SECONDS))); - zkClient2.startAndWait(); + zkClient2.startAsync().awaitRunning(); try (ZKDiscoveryService discoveryService2 = new ZKDiscoveryService(zkClient2)) { cancellable2 = register(discoveryService2, "test_multi_client", "localhost", 54321); @@ -98,7 +98,7 @@ public void testDoubleRegister() throws Exception { public void run() { try { TimeUnit.SECONDS.sleep(2); - zkClient2.stopAndWait(); + zkClient2.stopAsync().awaitTerminated(); } catch (InterruptedException e) { LOG.error(e.getMessage(), e); } @@ -109,7 +109,7 @@ public void run() { cancellable = register(discoveryService, "test_multi_client", "localhost", 54321); cancellable.cancel(); } finally { - zkClient2.stopAndWait(); + zkClient2.stopAsync().awaitTerminated(); } } finally { closeServices(entry); diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java index 42bff62d..e6b4d4b1 100644 --- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java +++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java @@ -86,12 +86,12 @@ protected void startUp() throws Exception { trackerAddr.getPort(), trackerUrl.toString()); maxCapability = response.getMaximumResourceCapability(); - nmClient.startAndWait(); + nmClient.startAsync().awaitTerminated(); } @Override protected void shutDown() throws Exception { - nmClient.stopAndWait(); + nmClient.stopAsync().awaitTerminated(); amrmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, trackerUrl.toString()); amrmClient.stop(); } diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java index ca0bc080..52639915 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java @@ -81,7 +81,7 @@ protected final void doMain(final Service mainService, Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - mainService.stopAndWait(); + mainService.stopAsync().awaitTerminated(); } }); @@ -110,7 +110,7 @@ public void run() { throw Throwables.propagate(t); } } finally { - requiredServices.stopAndWait(); + requiredServices.stopAsync().awaitTerminated(); ILoggerFactory loggerFactory = LoggerFactory.getILoggerFactory(); if (loggerFactory instanceof LoggerContext) { diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java index 036be811..01e45dd2 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java @@ -169,7 +169,7 @@ protected void startUp() throws Exception { // no left over content from previous AM attempt. LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath); ZKOperations.createDeleteIfExists(zkClient, kafkaZKPath, null, CreateMode.PERSISTENT, true).get(); - kafkaServer.startAndWait(); + kafkaServer.startAsync().awaitRunning(); } @Override @@ -183,7 +183,7 @@ protected void shutDown() throws Exception { // Ignore LOG.info("Kafka shutdown delay interrupted", e); } finally { - kafkaServer.stopAndWait(); + kafkaServer.stopAsync().awaitTerminated(); } } @@ -230,13 +230,13 @@ private YarnAMClientService(YarnAMClient yarnAMClient, TrackerService trackerSer @Override protected void startUp() throws Exception { trackerService.setHost(yarnAMClient.getHost()); - trackerService.startAndWait(); + trackerService.startAsync().awaitRunning(); yarnAMClient.setTracker(trackerService.getBindAddress(), trackerService.getUrl()); try { - yarnAMClient.startAndWait(); + yarnAMClient.startAsync().awaitRunning(); } catch (Exception e) { - trackerService.stopAndWait(); + trackerService.stopAsync().awaitTerminated(); throw e; } } @@ -244,9 +244,9 @@ protected void startUp() throws Exception { @Override protected void shutDown() throws Exception { try { - yarnAMClient.stopAndWait(); + yarnAMClient.stopAsync().awaitTerminated(); } finally { - trackerService.stopAndWait(); + trackerService.stopAsync().awaitTerminated(); } } } diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index 6649bf41..ce68148d 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -21,14 +21,15 @@ import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Supplier; -import com.google.common.collect.DiscreteDomains; +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.DiscreteDomain; import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multiset; -import com.google.common.collect.Ranges; +import com.google.common.collect.Range; import com.google.common.collect.Sets; import com.google.common.reflect.TypeToken; import com.google.common.util.concurrent.Futures; @@ -36,6 +37,27 @@ import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import java.io.File; +import java.io.IOException; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; @@ -84,28 +106,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.io.Reader; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; - /** * The class that acts as {@code ApplicationMaster} for Twill applications. */ @@ -636,7 +636,7 @@ private long checkProvisionTimeout(long nextTimeoutCheck) { if (action.getTimeout() < 0) { // Abort application stopStatus = StopStatus.ABORTED; - stop(); + stopAsync(); } else { return nextTimeoutCheck + action.getTimeout(); } @@ -1023,7 +1023,7 @@ public void run() { int runningCount = runningContainers.count(runnableName); Set instancesToRemove = instanceIds == null ? null : ImmutableSet.copyOf(instanceIds); if (instancesToRemove == null) { - instancesToRemove = Ranges.closedOpen(0, runningCount).asSet(DiscreteDomains.integers()); + instancesToRemove = ContiguousSet.create(Range.closedOpen(0, runningCount), DiscreteDomain.integers()); } LOG.info("Restarting instances {} for runnable {}", instancesToRemove, runnableName); diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java index e48dcbb6..454ab14c 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java @@ -17,6 +17,7 @@ */ package org.apache.twill.internal.appmaster; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.collect.Maps; import org.apache.twill.common.Cancellable; @@ -50,7 +51,7 @@ public RunnableProcessLauncher(YarnContainerInfo containerInfo, YarnNMClient nmC @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("container", containerInfo) .toString(); } diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java index f8ed27ec..58b0a520 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java @@ -36,6 +36,7 @@ import com.google.common.util.concurrent.Service; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import java.nio.charset.StandardCharsets; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.twill.api.EventHandler; import org.apache.twill.api.ResourceReport; @@ -43,6 +44,7 @@ import org.apache.twill.api.RuntimeSpecification; import org.apache.twill.api.TwillRunResources; import org.apache.twill.api.logging.LogEntry; +import org.apache.twill.common.Threads; import org.apache.twill.filesystem.Location; import org.apache.twill.internal.Constants; import org.apache.twill.internal.ContainerExitCodes; @@ -581,7 +583,7 @@ public void onFailure(Throwable t) { } } } - }); + }, Threads.SAME_THREAD_EXECUTOR); } /** @@ -735,7 +737,7 @@ private Location saveLogLevels() { try { Gson gson = new GsonBuilder().serializeNulls().create(); String jsonStr = gson.toJson(logLevels); - String fileName = Hashing.md5().hashString(jsonStr) + "." + Constants.Files.LOG_LEVELS; + String fileName = Hashing.md5().hashString(jsonStr, StandardCharsets.UTF_8) + "." + Constants.Files.LOG_LEVELS; Location location = applicationLocation.append(fileName); if (!location.exists()) { try (Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8)) { diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java index e6d86a55..7b443117 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.AbstractService; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import java.nio.charset.StandardCharsets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.security.Credentials; @@ -188,8 +189,10 @@ private static Map> loadLogLevels() throws IOExcepti } private static Arguments decodeArgs() throws IOException { - return ArgumentsCodec.decode( - Files.newReaderSupplier(new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.ARGUMENTS), Charsets.UTF_8)); + File argsFile = new File(Constants.Files.RUNTIME_CONFIG_JAR, Constants.Files.ARGUMENTS); + try (Reader reader = Files.newReader(argsFile, StandardCharsets.UTF_8)) { + return ArgumentsCodec.decode(reader); + } } @Override diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java index cf6c3b22..dae6140a 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java @@ -124,10 +124,10 @@ protected void doStartUp() { LOG.info("Application {} with id {} submitted", appName, appId); YarnApplicationState state = report.getYarnApplicationState(); - Stopwatch stopWatch = new Stopwatch().start(); + Stopwatch stopWatch = Stopwatch.createStarted(); LOG.debug("Checking yarn application status for {} {}", appName, appId); - while (!hasRun(state) && stopWatch.elapsedTime(startTimeoutUnit) < startTimeout) { + while (!hasRun(state) && stopWatch.elapsed(startTimeoutUnit) < startTimeout) { report = processController.getReport(); state = report.getYarnApplicationState(); LOG.debug("Yarn application status for {} {}: {}", appName, appId, state); @@ -168,13 +168,13 @@ protected synchronized void doShutDown() { FinalApplicationStatus finalStatus; // Poll application status from yarn try (ProcessController processController = this.processController) { - Stopwatch stopWatch = new Stopwatch().start(); + Stopwatch stopWatch = Stopwatch.createStarted(); YarnApplicationReport report = processController.getReport(); finalStatus = report.getFinalApplicationStatus(); ApplicationId appId = report.getApplicationId(); while (finalStatus == FinalApplicationStatus.UNDEFINED && - stopWatch.elapsedTime(TimeUnit.MILLISECONDS) < timeoutMillis) { + stopWatch.elapsed(TimeUnit.MILLISECONDS) < timeoutMillis) { LOG.debug("Yarn application final status for {} {}: {}", appName, appId, finalStatus); TimeUnit.SECONDS.sleep(1); finalStatus = processController.getReport().getFinalApplicationStatus(); diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index c67406a4..79361f4b 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -35,7 +35,6 @@ import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.io.ByteStreams; -import com.google.common.io.OutputSupplier; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -447,11 +446,12 @@ public ProcessController call() throws Exception { YarnTwillController controller = controllerFactory.create(runId, isLogCollectionEnabled(), logHandlers, submitTask, timeout, timeoutUnit); - controller.start(); + controller.startAsync(); return controller; } catch (Exception e) { LOG.error("Failed to submit application {}", twillSpec.getName(), e); - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -605,7 +605,7 @@ private void createApplicationJar(final ApplicationBundler bundler, List classList = classes.stream().map(Class::getName).sorted().collect(Collectors.toList()); Hasher hasher = Hashing.md5().newHasher(); for (String name : classList) { - hasher.putString(name); + hasher.putString(name, StandardCharsets.UTF_8); } // Only depends on class list so that it can be reused across different launches String name = hasher.hash().toString() + "-" + Constants.Files.APPLICATION_JAR; @@ -836,12 +836,9 @@ public String apply(String options) { private void saveArguments(Arguments arguments, final Path targetPath) throws IOException { LOG.debug("Creating {}", targetPath); - ArgumentsCodec.encode(arguments, new OutputSupplier() { - @Override - public Writer getOutput() throws IOException { - return Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8); - } - }); + try (Writer writer = Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8)) { + ArgumentsCodec.encode(arguments, writer); + } LOG.debug("Done {}", targetPath); } diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java index 15902f26..cf62e99d 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java @@ -180,12 +180,12 @@ protected void shutDown() throws Exception { @Override public void start() { - serviceDelegate.startAndWait(); + serviceDelegate.startAsync().awaitRunning(); } @Override public void stop() { - serviceDelegate.stopAndWait(); + serviceDelegate.stopAsync().awaitTerminated(); } /** @@ -347,7 +347,7 @@ public Iterable lookupLive() { } private void startUp() throws Exception { - zkClientService.startAndWait(); + zkClientService.startAsync().awaitRunning(); // Create the root node, so that the namespace root would get created if it is missing // If the exception is caused by node exists, then it's ok. Otherwise propagate the exception. @@ -431,7 +431,7 @@ private LocationCacheCleaner startLocationCacheCleaner(final Location cacheBase, return !activeLocations.contains(location); }); - cleaner.startAndWait(); + cleaner.startAsync().awaitRunning(); return cleaner; } @@ -442,14 +442,14 @@ private void shutDown() throws Exception { // daemon threads. synchronized (this) { if (locationCacheCleaner != null) { - locationCacheCleaner.stopAndWait(); + locationCacheCleaner.stopAsync().awaitTerminated(); } if (secureStoreScheduler != null) { secureStoreScheduler.shutdownNow(); } } watchCancellable.cancel(); - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } private Cancellable watchLiveApps() { @@ -600,7 +600,7 @@ public void onSuccess(NodeData result) { YarnTwillController controller = listenController( new YarnTwillController(appName, runId, zkClient, amLiveNodeData, yarnAppClient)); controllers.put(appName, runId, controller); - controller.start(); + controller.startAsync(); } } } diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java index 4c7d84b4..e5a39321 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java @@ -132,8 +132,8 @@ public boolean waitForSize(Iterable iterable, int count, int limit) throw * @throws Exception if the task through exception or timeout. */ public void waitFor(T expected, Callable callable, long timeout, long delay, TimeUnit unit) throws Exception { - Stopwatch stopwatch = new Stopwatch().start(); - while (callable.call() != expected && stopwatch.elapsedTime(unit) < timeout) { + Stopwatch stopwatch = Stopwatch.createStarted(); + while (callable.call() != expected && stopwatch.elapsed(unit) < timeout) { unit.sleep(delay); } } diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java index 278a4363..ecc89bc1 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java @@ -161,7 +161,7 @@ public void run() { @Test public void testZKCleanup() throws Exception { final ZKClientService zkClient = ZKClientService.Builder.of(getZKConnectionString() + "/twill").build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try { TwillRunner runner = getTwillRunner(); @@ -222,7 +222,7 @@ public Stat call() throws Exception { }, 10000, 100, TimeUnit.MILLISECONDS); } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -239,7 +239,7 @@ public Stat call() throws Exception { private ResourceReport waitForAfterRestartResourceReport(TwillController controller, String runnable, long timeout, TimeUnit timeoutUnit, int numOfResources, @Nullable Map instanceIdToContainerId) { - Stopwatch stopwatch = new Stopwatch(); + Stopwatch stopwatch = Stopwatch.createStarted(); stopwatch.start(); do { ResourceReport report = controller.getResourceReport(); @@ -271,7 +271,7 @@ private ResourceReport waitForAfterRestartResourceReport(TwillController control } Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } - } while (stopwatch.elapsedTime(timeoutUnit) < timeout); + } while (stopwatch.elapsed(timeoutUnit) < timeout); LOG.error("Unable to get different container ids for restart."); return null; diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java index bd2b245f..34128d8c 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EventHandlerTestRun.java @@ -105,8 +105,8 @@ public void testKilled() throws IOException, InterruptedException, TimeoutExcept .start(); // Wait for the runnable to run and create runFile within 120 secs File runFile = new File(parentFolder, RUN_FILE); - Stopwatch stopwatch = new Stopwatch().start(); - while (!runFile.exists() && stopwatch.elapsedTime(TimeUnit.SECONDS) < 120) { + Stopwatch stopwatch = Stopwatch.createStarted(); + while (!runFile.exists() && stopwatch.elapsed(TimeUnit.SECONDS) < 120) { TimeUnit.SECONDS.sleep(1); } Assert.assertTrue(runFile.exists()); diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java index 43787f0d..c6c7c029 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java @@ -252,9 +252,9 @@ private void waitForLogLevel(TwillController controller, String runnable, long t Map expectedArgs, int expectedInstances) throws InterruptedException { - Stopwatch stopwatch = new Stopwatch(); + Stopwatch stopwatch = Stopwatch.createStarted(); stopwatch.start(); - while (stopwatch.elapsedTime(timeoutUnit) < timeout) { + while (stopwatch.elapsed(timeoutUnit) < timeout) { ResourceReport report = controller.getResourceReport(); if (report == null || report.getRunnableResources(runnable) == null) { diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java index 717a80f0..8706e83d 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java @@ -161,7 +161,7 @@ public void run() { private boolean waitForLogLevel(TwillController controller, String runnable, long timeout, TimeUnit timeoutUnit, @Nullable LogEntry.Level expected) throws InterruptedException { - Stopwatch stopwatch = new Stopwatch(); + Stopwatch stopwatch = Stopwatch.createStarted(); stopwatch.start(); do { ResourceReport report = controller.getResourceReport(); @@ -175,7 +175,7 @@ private boolean waitForLogLevel(TwillController controller, String runnable, lon } } TimeUnit.MILLISECONDS.sleep(100); - } while (stopwatch.elapsedTime(timeoutUnit) < timeout); + } while (stopwatch.elapsed(timeoutUnit) < timeout); return false; } diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java index a61880fe..8a0ab7c4 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java @@ -306,8 +306,8 @@ public Iterator iterator() { private ResourceReport getResourceReport(TwillController controller, long timeoutMillis) { ResourceReport report = controller.getResourceReport(); - Stopwatch stopwatch = new Stopwatch(); - while (report == null && stopwatch.elapsedMillis() < timeoutMillis) { + Stopwatch stopwatch = Stopwatch.createStarted(); + while (report == null && stopwatch.elapsed().toMillis() < timeoutMillis) { Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); report = controller.getResourceReport(); } diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/RestartRunnableTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/RestartRunnableTestRun.java index 6dcec8f6..48b574fa 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/RestartRunnableTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/RestartRunnableTestRun.java @@ -284,7 +284,7 @@ public void testRestartRunnable() throws Exception { private void waitForContainers(TwillController controller, int count, long timeout, TimeUnit timeoutUnit) throws Exception { - Stopwatch stopwatch = new Stopwatch(); + Stopwatch stopwatch = Stopwatch.createStarted(); stopwatch.start(); int yarnContainers = 0; int twillContainers = 0; @@ -298,7 +298,7 @@ private void waitForContainers(TwillController controller, int count, long timeo } } TimeUnit.SECONDS.sleep(1); - } while (stopwatch.elapsedTime(timeoutUnit) < timeout); + } while (stopwatch.elapsed(timeoutUnit) < timeout); throw new TimeoutException("Timeout reached while waiting for num containers to be " + count + ". Yarn containers = " + yarnContainers + ", Twill containers = " + twillContainers); @@ -306,7 +306,7 @@ private void waitForContainers(TwillController controller, int count, long timeo private void waitForInstance(TwillController controller, String runnable, String yarnInstanceId, long timeout, TimeUnit timeoutUnit) throws InterruptedException, TimeoutException { - Stopwatch stopwatch = new Stopwatch(); + Stopwatch stopwatch = Stopwatch.createStarted(); stopwatch.start(); do { ResourceReport report = controller.getResourceReport(); @@ -318,7 +318,7 @@ private void waitForInstance(TwillController controller, String runnable, String } } TimeUnit.SECONDS.sleep(1); - } while (stopwatch.elapsedTime(timeoutUnit) < timeout); + } while (stopwatch.elapsed(timeoutUnit) < timeout); throw new TimeoutException("Timeout reached while waiting for runnable " + runnable + " instance " + yarnInstanceId); diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java index 74cf80e1..e2def33e 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java @@ -89,7 +89,7 @@ private boolean expireAppMasterZKSession(TwillController controller, long timeou MBeanServer mbeanServer = MBeanRegistry.getInstance().getPlatformMBeanServer(); QueryExp query = Query.isInstanceOf(new StringValueExp(ConnectionMXBean.class.getName())); - Stopwatch stopwatch = new Stopwatch(); + Stopwatch stopwatch = Stopwatch.createStarted(); stopwatch.start(); do { // Find the AM session and expire it @@ -108,7 +108,7 @@ private boolean expireAppMasterZKSession(TwillController controller, long timeou } } Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - } while (stopwatch.elapsedTime(timeoutUnit) < timeout); + } while (stopwatch.elapsed(timeoutUnit) < timeout); return false; } diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java index 407d519a..05a988e4 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java @@ -111,7 +111,7 @@ protected void before() throws Throwable { // Starts Zookeeper zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); // Start YARN mini cluster File miniDFSDir = tmpFolder.newFolder(); @@ -241,7 +241,7 @@ public ApplicationResourceUsageReport getApplicationResourceReport(String appId) private void stopQuietly(Service service) { try { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } catch (Exception e) { LOG.warn("Failed to stop service {}.", service, e); } diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java index dc2bfa99..f50b7af7 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java @@ -27,7 +27,9 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; +import java.util.concurrent.TimeoutException; import org.apache.twill.common.Cancellable; import org.apache.twill.common.Threads; import org.apache.twill.zookeeper.ACLData; @@ -189,14 +191,14 @@ public void onFailure(Throwable t) { // handle the failure updateFailureResult(t, result, path, ignoreNodeExists); } - }); + }, MoreExecutors.directExecutor()); } @Override public void onFailure(Throwable t) { result.setException(t); } - }); + }, MoreExecutors.directExecutor()); } /** @@ -236,7 +238,7 @@ private String getParent(String path) { String parentPath = path.substring(0, path.lastIndexOf('/')); return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath; } - }); + }, MoreExecutors.directExecutor()); return result; } @@ -302,13 +304,8 @@ public ZooKeeper get() { } @Override - public ListenableFuture start() { - return serviceDelegate.start(); - } - - @Override - public State startAndWait() { - return serviceDelegate.startAndWait(); + public Service startAsync() { + return serviceDelegate.startAsync(); } @Override @@ -322,13 +319,33 @@ public State state() { } @Override - public ListenableFuture stop() { - return serviceDelegate.stop(); + public Service stopAsync() { + return serviceDelegate.stopAsync(); + } + + @Override + public void awaitRunning() { + serviceDelegate.awaitRunning(); + } + + @Override + public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + serviceDelegate.awaitRunning(timeout, unit); + } + + @Override + public void awaitTerminated() { + serviceDelegate.awaitTerminated(); + } + + @Override + public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + serviceDelegate.awaitTerminated(timeout, unit); } @Override - public State stopAndWait() { - return serviceDelegate.stopAndWait(); + public Throwable failureCause() { + return serviceDelegate.failureCause(); } @Override diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java index 73ee3088..a8fc8cda 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/FailureRetryZKClient.java @@ -20,6 +20,7 @@ import com.google.common.base.Supplier; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.twill.common.Threads; import org.apache.twill.zookeeper.ACLData; import org.apache.twill.zookeeper.ForwardingZKClient; @@ -74,7 +75,7 @@ public OperationFuture create(final String path, @Nullable final byte[] public OperationFuture get() { return FailureRetryZKClient.super.create(path, data, createMode, createParent, acl); } - })); + }), MoreExecutors.directExecutor()); return result; } @@ -88,7 +89,7 @@ public OperationFuture exists(final String path, final Watcher watcher) { public OperationFuture get() { return FailureRetryZKClient.super.exists(path, watcher); } - })); + }), MoreExecutors.directExecutor()); return result; } @@ -104,7 +105,7 @@ public OperationFuture getChildren(final String path, final Watche public OperationFuture get() { return FailureRetryZKClient.super.getChildren(path, watcher); } - })); + }), MoreExecutors.directExecutor()); return result; } @@ -118,7 +119,7 @@ public OperationFuture getData(final String path, final Watcher watche public OperationFuture get() { return FailureRetryZKClient.super.getData(path, watcher); } - })); + }), MoreExecutors.directExecutor()); return result; } @@ -132,7 +133,7 @@ public OperationFuture setData(final String dataPath, final byte[] data, f public OperationFuture get() { return FailureRetryZKClient.super.setData(dataPath, data, version); } - })); + }), MoreExecutors.directExecutor()); return result; } @@ -148,7 +149,7 @@ public OperationFuture delete(final String deletePath, final int version public OperationFuture get() { return FailureRetryZKClient.super.delete(deletePath, version); } - })); + }), MoreExecutors.directExecutor()); return result; } @@ -162,7 +163,7 @@ public OperationFuture getACL(final String path) { public OperationFuture get() { return FailureRetryZKClient.super.getACL(path); } - })); + }), MoreExecutors.directExecutor()); return result; } @@ -176,7 +177,7 @@ public OperationFuture setACL(final String path, final Iterable acl, public OperationFuture get() { return FailureRetryZKClient.super.setACL(path, acl, version); } - })); + }), MoreExecutors.directExecutor()); return result; } @@ -230,7 +231,8 @@ private boolean doRetry(Throwable t) { SCHEDULER.schedule(new Runnable() { @Override public void run() { - Futures.addCallback(retryAction.get(), OperationFutureCallback.this); + Futures.addCallback(retryAction.get(), OperationFutureCallback.this, + MoreExecutors.directExecutor()); } }, nextRetry, TimeUnit.MILLISECONDS); diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java index d18d5edf..dcc2c6e7 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java @@ -18,10 +18,14 @@ package org.apache.twill.internal.zookeeper; import com.google.common.base.Preconditions; -import com.google.common.io.Files; + import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; +import java.io.IOException; +import java.nio.file.Files; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; @@ -79,7 +83,12 @@ public static Builder builder() { private InMemoryZKServer(File dataDir, int tickTime, boolean autoClean, int port) { if (dataDir == null) { - dataDir = Files.createTempDir(); + try { + // Updated to use standard Java NIO, as Guava's Files.createTempDir is deprecated + dataDir = Files.createTempDirectory("twill-zk").toFile(); + } catch (IOException e) { + throw new RuntimeException("Failed to create temp directory", e); + } autoClean = true; } else { Preconditions.checkArgument(dataDir.isDirectory() || dataDir.mkdirs() || dataDir.isDirectory()); @@ -123,13 +132,9 @@ private void cleanDir(File dir) { } @Override - public ListenableFuture start() { - return delegateService.start(); - } - - @Override - public State startAndWait() { - return delegateService.startAndWait(); + public Service startAsync() { + delegateService.startAsync(); + return this; } @Override @@ -143,13 +148,34 @@ public State state() { } @Override - public ListenableFuture stop() { - return delegateService.stop(); + public Service stopAsync() { + delegateService.stopAsync(); + return this; + } + + @Override + public void awaitRunning() { + delegateService.awaitRunning(); + } + + @Override + public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + delegateService.awaitRunning(timeout, unit); + } + + @Override + public void awaitTerminated() { + delegateService.awaitTerminated(); + } + + @Override + public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + delegateService.awaitTerminated(timeout, unit); } @Override - public State stopAndWait() { - return delegateService.stopAndWait(); + public Throwable failureCause() { + return delegateService.failureCause(); } @Override diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java index d8bb49d1..30f1728a 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/LeaderElection.java @@ -234,7 +234,7 @@ private void becomeLeader() { handler.leader(); } catch (Throwable t) { LOG.warn("Exception thrown when calling leader() method. Withdraw from the leader election process.", t); - stop(); + stopAsync(); } } @@ -245,7 +245,7 @@ private void becomeFollower() { handler.follower(); } catch (Throwable t) { LOG.warn("Exception thrown when calling follower() method. Withdraw from the leader election process.", t); - stop(); + stopAsync(); } } diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java index 239a6560..6c41a693 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/NamespaceZKClient.java @@ -135,7 +135,7 @@ public void onSuccess(V result) { public void onFailure(Throwable t) { to.setException(t); } - }); + }, Threads.SAME_THREAD_EXECUTOR); return to; } diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLock.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLock.java index c45db7a5..c10c0f56 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLock.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLock.java @@ -81,7 +81,8 @@ public void lock() { acquire(false, true); } catch (Exception e) { lock.unlock(); - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -92,8 +93,9 @@ public void lockInterruptibly() throws InterruptedException { acquire(true, true); } catch (Exception e) { lock.unlock(); - Throwables.propagateIfInstanceOf(e, InterruptedException.class); - throw Throwables.propagate(e); + Throwables.throwIfInstanceOf(e, InterruptedException.class); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -110,7 +112,8 @@ public boolean tryLock() { return false; } catch (Exception e) { lock.unlock(); - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -129,7 +132,8 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } catch (ExecutionException e) { lock.unlock(); - throw Throwables.propagate(e.getCause()); + Throwables.throwIfUnchecked(e.getCause()); + throw new RuntimeException(e.getCause()); } catch (TimeoutException e) { lock.unlock(); return false; @@ -148,7 +152,8 @@ public void unlock() { try { Uninterruptibles.getUninterruptibly(zkClient.delete(localLockNode.get())); } catch (ExecutionException e) { - throw Throwables.propagate(e.getCause()); + Throwables.throwIfUnchecked(e.getCause()); + throw new RuntimeException(e.getCause()); } finally { localLockNode.remove(); } @@ -176,7 +181,8 @@ private boolean acquire(boolean interruptible, boolean waitForLock) throws Inter return acquire(interruptible, waitForLock, Long.MAX_VALUE, TimeUnit.SECONDS); } catch (TimeoutException e) { // Should never happen - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -258,7 +264,7 @@ public void onFailure(Throwable t) { completion.setException(t); } } - }); + }, Threads.SAME_THREAD_EXECUTOR); // Gets the result from the completion try { @@ -353,7 +359,7 @@ public void onFailure(Throwable t) { completion.setException(t); } } - }); + }, Threads.SAME_THREAD_EXECUTOR); } @Override @@ -364,7 +370,7 @@ public void onFailure(Throwable t) { doAcquire(completion, waitForLock, guid, null); } } - }); + }, Threads.SAME_THREAD_EXECUTOR); } /** diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java index 776efe4a..769a1dd3 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireWatcher.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.twill.zookeeper.NodeChildren; import org.apache.twill.zookeeper.NodeData; import org.apache.twill.zookeeper.ZKClient; @@ -122,7 +123,7 @@ public void onFailure(Throwable t) { LOG.error("Fail to re-set watch on exists for path " + path, t); } } - }); + }, MoreExecutors.directExecutor()); } private void children() { @@ -168,7 +169,7 @@ public void onFailure(Throwable t) { } LOG.error("Fail to re-set watch on getChildren for path " + path, t); } - }); + }, MoreExecutors.directExecutor()); } private void data() { @@ -202,6 +203,6 @@ public void onFailure(Throwable t) { } LOG.error("Fail to re-set watch on getData for path " + path, t); } - }); + }, MoreExecutors.directExecutor()); } } diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java index ed0e0bd5..708cc78d 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/RewatchOnExpireZKClient.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.twill.internal.zookeeper.RewatchOnExpireWatcher.ActionType; import org.apache.twill.zookeeper.ForwardingZKClient; import org.apache.twill.zookeeper.NodeChildren; @@ -55,7 +56,7 @@ public void onSuccess(Stat result) { public void onFailure(Throwable t) { // No-op } - }); + }, MoreExecutors.directExecutor()); return result; } @@ -76,7 +77,7 @@ public void onSuccess(NodeChildren result) { public void onFailure(Throwable t) { // No-op } - }); + }, MoreExecutors.directExecutor()); return result; } @@ -97,7 +98,7 @@ public void onSuccess(NodeData result) { public void onFailure(Throwable t) { // No-op } - }); + }, MoreExecutors.directExecutor()); return result; } diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java index 10391b2d..469e6b28 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ForwardingZKClientService.java @@ -18,11 +18,11 @@ package org.apache.twill.zookeeper; import com.google.common.base.Supplier; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.zookeeper.ZooKeeper; - +import com.google.common.util.concurrent.Service; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.zookeeper.ZooKeeper; /** * @@ -42,13 +42,9 @@ public Supplier getZooKeeperSupplier() { } @Override - public ListenableFuture start() { - return delegate.start(); - } - - @Override - public State startAndWait() { - return Futures.getUnchecked(start()); + public Service startAsync() { + delegate.startAsync(); + return this; } @Override @@ -62,13 +58,34 @@ public State state() { } @Override - public ListenableFuture stop() { - return delegate.stop(); + public Service stopAsync() { + delegate.stopAsync(); + return this; + } + + @Override + public void awaitRunning() { + delegate.awaitRunning(); + } + + @Override + public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + delegate.awaitRunning(timeout, unit); + } + + @Override + public void awaitTerminated() { + delegate.awaitTerminated(); + } + + @Override + public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + delegate.awaitTerminated(timeout, unit); } @Override - public State stopAndWait() { - return Futures.getUnchecked(stop()); + public Throwable failureCause() { + return delegate.failureCause(); } @Override diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java index bce63914..1e7ef176 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import org.apache.twill.common.Cancellable; import org.apache.twill.common.Threads; @@ -148,7 +149,7 @@ public void onSuccess(Stat result) { public void onFailure(Throwable t) { completion.setException(t); } - }); + }, MoreExecutors.directExecutor()); } public static Cancellable watchChildren(final ZKClient zkClient, String path, ChildrenCallback callback) { @@ -378,7 +379,7 @@ public void onSuccess(Stat result) { public void onFailure(Throwable t) { completion.setException(t); } - }); + }, MoreExecutors.directExecutor()); } private static void watchChanges(final Operation operation, final String path, @@ -419,7 +420,7 @@ public void run() { } LOG.error("Failed to watch data for path " + path + " " + t, t); } - }); + }, MoreExecutors.directExecutor()); } private ZKOperations() { diff --git a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java index 2d4b5d51..53e4324a 100644 --- a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java +++ b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java @@ -72,7 +72,7 @@ public void testElection() throws ExecutionException, InterruptedException, Brok final AtomicInteger currentLeader = new AtomicInteger(-1); for (int i = 0; i < participantCount; i++) { final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); stopLatch[i] = new CountDownLatch(1); zkClients.add(zkClient); @@ -95,10 +95,10 @@ public void follower() { followerSem.release(); } }); - leaderElection.start(); + leaderElection.startAsync(); stopLatch[idx].await(10, TimeUnit.SECONDS); - leaderElection.stopAndWait(); + leaderElection.stopAsync().awaitTerminated(); } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -125,7 +125,7 @@ public void follower() { executor.awaitTermination(5L, TimeUnit.SECONDS); for (ZKClientService zkClient : zkClients) { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } } @@ -143,7 +143,7 @@ public void testCancel() throws InterruptedException, IOException { try { for (int i = 0; i < 2; i++) { ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); zkClients.add(zkClient); @@ -163,7 +163,7 @@ public void follower() { } for (LeaderElection leaderElection : leaderElections) { - leaderElection.start(); + leaderElection.startAsync(); } leaderSem.tryAcquire(10, TimeUnit.SECONDS); @@ -177,7 +177,7 @@ public void follower() { zkClients.get(follower).getConnectString(), 20000); // Cancel the leader - leaderElections.get(leader).stopAndWait(); + leaderElections.get(leader).stopAsync().awaitTerminated(); // Now follower should still be able to become leader. leaderSem.tryAcquire(30, TimeUnit.SECONDS); @@ -197,19 +197,19 @@ public void follower() { followerSem.release(); } })); - leaderElections.get(follower).start(); + leaderElections.get(follower).startAsync(); // Cancel the follower first. - leaderElections.get(follower).stopAndWait(); + leaderElections.get(follower).stopAsync().awaitTerminated(); // Cancel the leader. - leaderElections.get(leader).stopAndWait(); + leaderElections.get(leader).stopAsync().awaitTerminated(); // Since the follower has been cancelled before leader, there should be no leader. Assert.assertFalse(leaderSem.tryAcquire(10, TimeUnit.SECONDS)); } finally { for (ZKClientService zkClient : zkClients) { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } } @@ -218,10 +218,10 @@ public void follower() { public void testDisconnect() throws IOException, InterruptedException { File zkDataDir = tmpFolder.newFolder(); InMemoryZKServer ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).build(); - ownZKServer.startAndWait(); + ownZKServer.startAsync().awaitRunning(); try { ZKClientService zkClient = ZKClientService.Builder.of(ownZKServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try { final Semaphore leaderSem = new Semaphore(0); @@ -238,44 +238,44 @@ public void follower() { followerSem.release(); } }); - leaderElection.start(); + leaderElection.startAsync(); leaderSem.tryAcquire(20, TimeUnit.SECONDS); int zkPort = ownZKServer.getLocalAddress().getPort(); // Disconnect by shutting the server and restart it on the same port - ownZKServer.stopAndWait(); + ownZKServer.stopAsync().awaitTerminated(); // Right after disconnect, it should become follower followerSem.tryAcquire(20, TimeUnit.SECONDS); ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).setPort(zkPort).build(); - ownZKServer.startAndWait(); + ownZKServer.startAsync().awaitRunning(); // Right after reconnect, it should be leader again. leaderSem.tryAcquire(20, TimeUnit.SECONDS); // Now disconnect it again, but then cancel it before reconnect, it shouldn't become leader - ownZKServer.stopAndWait(); + ownZKServer.stopAsync().awaitTerminated(); // Right after disconnect, it should become follower followerSem.tryAcquire(20, TimeUnit.SECONDS); - ListenableFuture cancelFuture = leaderElection.stop(); + leaderElection.stopAsync(); ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).setPort(zkPort).build(); - ownZKServer.startAndWait(); + ownZKServer.startAsync().awaitRunning(); - Futures.getUnchecked(cancelFuture); + leaderElection.awaitTerminated(); // After reconnect, it should not be leader Assert.assertFalse(leaderSem.tryAcquire(10, TimeUnit.SECONDS)); } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } finally { - ownZKServer.stopAndWait(); + ownZKServer.stopAsync().awaitTerminated(); } } @@ -289,7 +289,7 @@ public void testRace() throws InterruptedException { // This is to test the case when a follower tries to watch for leader node, but the leader is already gone for (int i = 0; i < 2; i++) { final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); executor.execute(new Runnable() { @Override public void run() { @@ -308,13 +308,13 @@ public void follower() { // no-op } }); - election.startAndWait(); + election.startAsync().awaitRunning(); Uninterruptibles.awaitUninterruptibly(leaderLatch); - election.stopAndWait(); + election.stopAsync().awaitTerminated(); } completeLatch.countDown(); } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } }); @@ -330,11 +330,11 @@ public void follower() { @BeforeClass public static void init() throws IOException { zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); } @AfterClass public static void finish() { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } diff --git a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLockTest.java b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLockTest.java index a617b1de..b130a345 100644 --- a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLockTest.java +++ b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/ReentrantDistributedLockTest.java @@ -49,12 +49,12 @@ public class ReentrantDistributedLockTest { @BeforeClass public static void init() throws IOException { zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); } @AfterClass public static void finish() { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } @Test(timeout = 20000) @@ -74,7 +74,7 @@ public void testReentrant() { lock.unlock(); } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -111,7 +111,7 @@ public void run() { t.join(); } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -160,8 +160,8 @@ public void run() { Assert.assertTrue(lockAcquired.await(5, TimeUnit.SECONDS)); t.join(); } finally { - zkClient1.stopAndWait(); - zkClient2.stopAndWait(); + zkClient1.stopAsync().awaitTerminated(); + zkClient2.stopAsync().awaitTerminated(); } } @@ -201,7 +201,7 @@ public void run() { lock.unlock(); } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -253,8 +253,8 @@ public void run() { lock2.unlock(); } finally { - zkClient1.stopAndWait(); - zkClient2.stopAndWait(); + zkClient1.stopAsync().awaitTerminated(); + zkClient2.stopAsync().awaitTerminated(); } } @@ -310,7 +310,7 @@ public void run() { Assert.assertTrue(lock.tryLock()); lock.unlock(); } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -368,8 +368,8 @@ public void run() { Assert.assertTrue(lock1.tryLock()); lock1.unlock(); } finally { - zkClient1.stopAndWait(); - zkClient2.stopAndWait(); + zkClient1.stopAsync().awaitTerminated(); + zkClient2.stopAsync().awaitTerminated(); } } @@ -418,8 +418,8 @@ public void run() { Assert.assertTrue(lockLatch.await(30, TimeUnit.SECONDS)); } finally { - zkClient1.stopAndWait(); - zkClient2.stopAndWait(); + zkClient1.stopAsync().awaitTerminated(); + zkClient2.stopAsync().awaitTerminated(); } } @@ -470,14 +470,14 @@ public void run() { } } finally { - zkClient1.stopAndWait(); - zkClient2.stopAndWait(); + zkClient1.stopAsync().awaitTerminated(); + zkClient2.stopAsync().awaitTerminated(); } } private ZKClientService createZKClient() { ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); return zkClient; } diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java index b9cb8a44..e79a4ec0 100644 --- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java +++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKClientTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.twill.internal.zookeeper.InMemoryZKServer; import org.apache.twill.internal.zookeeper.KillZKSession; import org.apache.zookeeper.CreateMode; @@ -66,11 +67,11 @@ public class ZKClientTest { @Test public void testChroot() throws Exception { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr() + "/chroot").build(); - client.startAndWait(); + client.startAsync().awaitRunning(); try { List> futures = Lists.newArrayList(); futures.add(client.create("/test1/test2", null, CreateMode.PERSISTENT)); @@ -81,21 +82,21 @@ public void testChroot() throws Exception { Assert.assertNotNull(client.exists("/test1/test3").get()); } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @Test public void testCreateParent() throws ExecutionException, InterruptedException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - client.startAndWait(); + client.startAsync().awaitRunning(); try { String path = client.create("/test1/test2/test3/test4/test5", @@ -109,21 +110,21 @@ public void testCreateParent() throws ExecutionException, InterruptedException { } Assert.assertTrue(Arrays.equals("testing".getBytes(), client.getData(path).get().getData())); } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @Test public void testGetChildren() throws ExecutionException, InterruptedException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - client.startAndWait(); + client.startAsync().awaitRunning(); try { client.create("/test", null, CreateMode.PERSISTENT).get(); @@ -138,21 +139,21 @@ public void testGetChildren() throws ExecutionException, InterruptedException { Assert.assertEquals(ImmutableSet.of("c1", "c2"), ImmutableSet.copyOf(nodeChildren.getChildren())); } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @Test public void testSetData() throws ExecutionException, InterruptedException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - client.startAndWait(); + client.startAsync().awaitRunning(); client.create("/test", null, CreateMode.PERSISTENT).get(); Assert.assertNull(client.getData("/test").get().getData()); @@ -161,14 +162,14 @@ public void testSetData() throws ExecutionException, InterruptedException { Assert.assertTrue(Arrays.equals("testing".getBytes(), client.getData("/test").get().getData())); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @Test public void testExpireRewatch() throws InterruptedException, IOException, ExecutionException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { final CountDownLatch expireReconnectLatch = new CountDownLatch(1); @@ -186,7 +187,7 @@ public void process(WatchedEvent event) { } } }).build())); - client.startAndWait(); + client.startAsync().awaitRunning(); try { final BlockingQueue events = new LinkedBlockingQueue<>(); @@ -203,7 +204,7 @@ public void onSuccess(Stat result) { public void onFailure(Throwable t) { LOG.error("Failed to call exists on /expireRewatch", t); } - }); + }, MoreExecutors.directExecutor()); } }); @@ -222,10 +223,10 @@ public void onFailure(Throwable t) { Assert.assertEquals(Watcher.Event.EventType.NodeDeleted, events.poll(60, TimeUnit.SECONDS)); } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @@ -233,7 +234,7 @@ public void onFailure(Throwable t) { public void testRetry() throws ExecutionException, InterruptedException, TimeoutException, IOException { File dataDir = tmpFolder.newFolder(); InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(dataDir).setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); int port = zkServer.getLocalAddress().getPort(); final CountDownLatch disconnectLatch = new CountDownLatch(1); @@ -248,9 +249,9 @@ public void process(WatchedEvent event) { }).build(), RetryStrategies.fixDelay(0, TimeUnit.SECONDS))); final CountDownLatch createLatch = new CountDownLatch(1); - client.startAndWait(); + client.startAsync().awaitRunning(); try { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); Assert.assertTrue(disconnectLatch.await(1, TimeUnit.SECONDS)); Futures.addCallback(client.create("/testretry/test", null, CreateMode.PERSISTENT), new FutureCallback() { @@ -263,7 +264,7 @@ public void onSuccess(String result) { public void onFailure(Throwable t) { t.printStackTrace(System.out); } - }); + }, MoreExecutors.directExecutor()); TimeUnit.SECONDS.sleep(2); zkServer = InMemoryZKServer.builder() @@ -272,21 +273,21 @@ public void onFailure(Throwable t) { .setPort(port) .setTickTime(1000) .build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { Assert.assertTrue(createLatch.await(10, TimeUnit.SECONDS)); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } @Test public void testACL() throws IOException, ExecutionException, InterruptedException, NoSuchAlgorithmException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { String userPass = "user:pass"; @@ -297,10 +298,10 @@ public void testACL() throws IOException, ExecutionException, InterruptedExcepti .of(zkServer.getConnectionStr()) .addAuthInfo("digest", userPass.getBytes()) .build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); ZKClientService noAuthClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - noAuthClient.startAndWait(); + noAuthClient.startAsync().awaitRunning(); // Create a node that is readable by all client, but admin for the creator @@ -335,11 +336,11 @@ public void testACL() throws IOException, ExecutionException, InterruptedExcepti // Write again with the non-auth client, now should succeed. noAuthClient.setData(path, "test2".getBytes()).get(); - noAuthClient.stopAndWait(); - zkClient.stopAndWait(); + noAuthClient.stopAsync().awaitTerminated(); + zkClient.stopAsync().awaitTerminated(); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @@ -348,9 +349,9 @@ public void testDeadlock() throws IOException, InterruptedException { // This is to test deadlock bug as described in (TWILL-110) // This test has very high chance to get deadlock before the bug fix, hence failed with timeout. InMemoryZKServer zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { - for (int i = 0; i < 5000; i++) { + for (int i = 0; i < 100; i++) { final ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); zkClient.addConnectionWatcher(new Watcher() { @Override @@ -358,12 +359,12 @@ public void process(WatchedEvent event) { LOG.debug("Connection event: {}", event); } }); - zkClient.startAndWait(); - zkClient.stopAndWait(); + zkClient.startAsync().awaitRunning(); + zkClient.stopAsync().awaitTerminated(); } } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } @@ -387,10 +388,10 @@ public void run() { serverThread.start(); ZKClientService zkClient = ZKClientService.Builder.of("localhost:" + serverSocket.getLocalPort()).build(); - zkClient.start(); + zkClient.startAsync(); Assert.assertTrue(connectLatch.await(10, TimeUnit.SECONDS)); - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); serverThread.interrupt(); } } @@ -398,13 +399,13 @@ public void run() { @Test public void testNamespace() throws ExecutionException, InterruptedException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { ZKClientService zkClient = ZKClientService.Builder .of(zkServer.getConnectionStr()) .build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); ZKClient zk = ZKClients.namespace(zkClient, "/test"); // Create the "/ should create the "/test" from the root @@ -446,7 +447,7 @@ public void testNamespace() throws ExecutionException, InterruptedException { // The namespace must be gone Assert.assertNull(zkClient.exists("/test").get()); } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } } diff --git a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java index 9518d6eb..14b15ca6 100644 --- a/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java +++ b/twill-zookeeper/src/test/java/org/apache/twill/zookeeper/ZKOperationsTest.java @@ -34,11 +34,11 @@ public class ZKOperationsTest { @Test public void recursiveDelete() throws ExecutionException, InterruptedException, TimeoutException { InMemoryZKServer zkServer = InMemoryZKServer.builder().setTickTime(1000).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); try { ZKClientService client = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - client.startAndWait(); + client.startAsync().awaitRunning(); try { client.create("/test1/test10/test101", null, CreateMode.PERSISTENT).get(); @@ -54,10 +54,10 @@ public void recursiveDelete() throws ExecutionException, InterruptedException, T Assert.assertNull(client.exists("/test1").get(2, TimeUnit.SECONDS)); } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } finally { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } } From 8d30d89ea65a2325b45f514f67e503a5624c1f43 Mon Sep 17 00:00:00 2001 From: Dheeraj Kholia Date: Fri, 15 May 2026 15:17:24 +0530 Subject: [PATCH 2/2] Upgrading hadoop to 3.3.6 --- pom.xml | 67 +++++++++++++++++++ .../org/apache/twill/internal/Constants.java | 4 +- .../twill/internal/AbstractTwillService.java | 19 +++++- .../twill/internal/CompositeService.java | 9 +++ .../kafka/client/ZKBrokerService.java | 24 +++++-- .../twill/internal/logging/KafkaAppender.java | 29 ++++++-- .../apache/twill/internal/ControllerTest.java | 2 +- .../apache/twill/kafka/client/KafkaTest.java | 3 + twill-yarn/pom.xml | 5 ++ .../internal/yarn/Hadoop21YarnAMClient.java | 2 +- .../internal/yarn/Hadoop3YarnAMClient.java | 42 ++++++++++++ .../internal/yarn/Hadoop3YarnAppClient.java | 48 +++++++++++++ .../appmaster/ApplicationMasterMain.java | 3 + .../apache/twill/internal/yarn/YarnUtils.java | 43 ++++++------ .../twill/yarn/YarnTwillRunnerService.java | 6 +- .../apache/twill/yarn/EchoServerTestRun.java | 1 - .../twill/yarn/LogLevelChangeTestRun.java | 1 - .../apache/twill/yarn/LogLevelTestRun.java | 1 - .../apache/twill/yarn/MaxRetriesTestRun.java | 11 ++- .../twill/yarn/RestartRunnableTestRun.java | 2 - .../twill/yarn/SessionExpireTestRun.java | 1 - .../org/apache/twill/yarn/TwillTester.java | 4 ++ .../zookeeper/DefaultZKClientService.java | 6 +- .../internal/zookeeper/InMemoryZKServer.java | 8 ++- .../zookeeper/SettableOperationFuture.java | 6 +- .../zookeeper/LeaderElectionTest.java | 2 +- 26 files changed, 287 insertions(+), 62 deletions(-) create mode 100644 twill-yarn/src/main/hadoop3/org/apache/twill/internal/yarn/Hadoop3YarnAMClient.java create mode 100644 twill-yarn/src/main/hadoop3/org/apache/twill/internal/yarn/Hadoop3YarnAppClient.java diff --git a/pom.xml b/pom.xml index f6cc7169..f5d4580e 100644 --- a/pom.xml +++ b/pom.xml @@ -94,6 +94,7 @@ 1.5 false 2.12.10 + 3.12.4 @@ -535,6 +536,40 @@ 2.6.5 + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.8 + + + add-source + generate-sources + + add-source + + + + src/main/hadoop21 + src/main/hadoop22 + src/main/hadoop23 + src/main/hadoop26 + + + + + + + + + + hadoop-3 + + 3.3.6 + 3.5.7 + 5.2.0 + true @@ -557,6 +592,7 @@ src/main/hadoop22 src/main/hadoop23 src/main/hadoop26 + src/main/hadoop3 @@ -623,11 +659,31 @@ + + io.netty + netty-common + ${netty.version} + io.netty netty-buffer ${netty.version} + + io.netty + netty-transport + ${netty.version} + + + io.netty + netty-resolver + ${netty.version} + + + io.netty + netty-codec + ${netty.version} + io.netty netty-codec-http @@ -638,6 +694,11 @@ netty-handler ${netty.version} + + io.netty + netty-transport-native-epoll + ${netty.version} + org.xerial.snappy snappy-java @@ -899,6 +960,12 @@ ${junit.version} test + + org.mockito + mockito-core + ${mockito.version} + test + org.unitils unitils-core diff --git a/twill-common/src/main/java/org/apache/twill/internal/Constants.java b/twill-common/src/main/java/org/apache/twill/internal/Constants.java index 4135c9ad..8a25ebe8 100644 --- a/twill-common/src/main/java/org/apache/twill/internal/Constants.java +++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java @@ -25,9 +25,9 @@ public final class Constants { public static final String LOG_TOPIC = "log"; /** Maximum number of seconds for AM to start. */ - public static final int APPLICATION_MAX_START_SECONDS = 60; + public static final int APPLICATION_MAX_START_SECONDS = 180; /** Maximum number of seconds for AM to stop. */ - public static final int APPLICATION_MAX_STOP_SECONDS = 60; + public static final int APPLICATION_MAX_STOP_SECONDS = 180; public static final long PROVISION_TIMEOUT = 30000; diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java index ceb78ae7..a832f31d 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java @@ -94,6 +94,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic private final AtomicLong terminationTimeoutMillis; private ExecutorService messageCallbackExecutor; private Cancellable watcherCancellable; + private volatile Runnable stopMessageRemover; protected AbstractTwillService(final ZKClient zkClient, RunId runId) { this.zkClient = zkClient; @@ -203,6 +204,11 @@ protected final void shutDown() throws Exception { try { doStop(getTerminationTimeoutMillis(Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS)); } finally { + Runnable remover = stopMessageRemover; + if (remover != null) { + stopMessageRemover = null; + remover.run(); + } // Given at most 5 seconds to cleanup ZK nodes removeLiveNode().get(5, TimeUnit.SECONDS); LOG.info("Service {} with runId {} shutdown completed", serviceName(), runId.getId()); @@ -327,17 +333,26 @@ private boolean handleStopMessage(Message message, final Runnable messageRemover long timeoutMillis = SystemMessages.getTimeoutMillis(message.getCommand(), Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS); terminationTimeoutMillis.compareAndSet(-1L, timeoutMillis); + stopMessageRemover = messageRemover; addListener(new Listener() { @Override public void terminated(State from) { - messageRemover.run(); + Runnable remover = stopMessageRemover; + if (remover != null) { + stopMessageRemover = null; + remover.run(); + } } @Override public void failed(State from, Throwable failure) { LOG.error("Stop service failed upon STOP command", failure); - messageRemover.run(); + Runnable remover = stopMessageRemover; + if (remover != null) { + stopMessageRemover = null; + remover.run(); + } } }, MoreExecutors.directExecutor()); // Stop this service. diff --git a/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java b/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java index 659dc3c7..d0f16e7f 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/CompositeService.java @@ -57,6 +57,9 @@ protected void startUp() throws Exception { } catch (UncheckedExecutionException e) { failureCause = e.getCause(); break; + } catch (IllegalStateException e) { + failureCause = service.failureCause(); + break; } } @@ -98,6 +101,12 @@ private void stopAll() throws Exception { // Log for sub-sequence service shutdown error, as only the first failure cause will be thrown. LOG.warn("Failed to stop service {}", service, e); } + } catch (IllegalStateException e) { + if (failureCause == null) { + failureCause = service.failureCause(); + } else { + LOG.warn("Failed to stop service {}", service, e); + } } } diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java index de42b9bc..8d987634 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java @@ -82,6 +82,9 @@ public BrokerId apply(String input) { private static final Function BROKER_INFO_TO_ADDRESS = new Function() { @Override public String apply(BrokerInfo input) { + if (input == null) { + return null; + } return String.format("%s:%d", input.getHost(), input.getPort()); } }; @@ -140,7 +143,8 @@ public synchronized Iterable getBrokers() { } final SettableFuture readyFuture = SettableFuture.create(); - final AtomicReference> brokers = new AtomicReference<>(Collections.emptyList()); + final AtomicReference>> brokers = + new AtomicReference<>(Collections.>emptyList()); actOnExists(BROKER_IDS_PATH, new Runnable() { @@ -156,9 +160,7 @@ public void onSuccess(NodeChildren result) { // For each children node, get the BrokerInfo from the brokerInfo cache. brokers.set( ImmutableList.copyOf( - Iterables.transform( - brokerInfos.getAll(Iterables.transform(result.getChildren(), BROKER_ID_TRANSFORMER)).values(), - Suppliers.supplierFunction()))); + brokerInfos.getAll(Iterables.transform(result.getChildren(), BROKER_ID_TRANSFORMER)).values())); readyFuture.set(null); for (ListenerExecutor listener : listeners) { @@ -188,7 +190,7 @@ public void process(WatchedEvent event) { // If the ids node is deleted, clear the broker list and re-watch. // This could happen when the Kafka server is restarted and have the ZK node cleanup // The readyFuture for this call doesn't matter, as we don't need to block on anything - brokers.set(Collections.emptyList()); + brokers.set(Collections.>emptyList()); for (ListenerExecutor listener : listeners) { listener.changed(ZKBrokerService.this); } @@ -200,7 +202,12 @@ public void process(WatchedEvent event) { } }, readyFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS); - brokerList = this.>createSupplier(brokers); + brokerList = new Supplier>() { + @Override + public Iterable get() { + return Iterables.transform(brokers.get(), Suppliers.supplierFunction()); + } + }; try { readyFuture.get(); } catch (Exception e) { @@ -211,7 +218,7 @@ public void process(WatchedEvent event) { @Override public String getBrokerList() { - return Joiner.on(',').join(Iterables.transform(getBrokers(), BROKER_INFO_TO_ADDRESS)); + return Joiner.on(',').skipNulls().join(Iterables.transform(getBrokers(), BROKER_INFO_TO_ADDRESS)); } @Override @@ -253,6 +260,9 @@ public void onSuccess(NodeData result) { T value = decodeNodeData(result, resultType); resultValue.set(value); readyFuture.set(value); + for (ListenerExecutor listener : listeners) { + listener.changed(ZKBrokerService.this); + } } @Override diff --git a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java index 5404ffec..e03b8569 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java +++ b/twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java @@ -184,9 +184,18 @@ public void stop() { public void forceFlush() { try { - scheduler.submit(flushTask).get(2, TimeUnit.SECONDS); + scheduler.submit(new Runnable() { + @Override + public void run() { + try { + publishLogs(60L, TimeUnit.SECONDS); + } catch (Exception e) { + addError("Failed to push logs to Kafka during force flush.", e); + } + } + }).get(60, TimeUnit.SECONDS); } catch (Exception e) { - addError("Failed to force log flush in 2 seconds.", e); + addError("Failed to force log flush in 60 seconds.", e); } } @@ -206,9 +215,16 @@ protected void append(ILoggingEvent eventObject) { * @throws TimeoutException If timeout reached before publish completed. */ private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutException { - List logs = Lists.newArrayListWithExpectedSize(bufferedSize.get()); - + List cachedLogs = Lists.newArrayListWithExpectedSize(bufferedSize.get()); for (String json : Iterables.consumingIterable(buffer)) { + cachedLogs.add(json); + } + if (cachedLogs.isEmpty()) { + return 0; + } + + List logs = Lists.newArrayListWithExpectedSize(cachedLogs.size()); + for (String json : cachedLogs) { logs.add(Charsets.UTF_8.encode(json)); } @@ -219,7 +235,6 @@ private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutExcept try { Stopwatch stopwatch = Stopwatch.createStarted(); - stopwatch.start(); long publishTimeout = timeout; do { @@ -237,7 +252,11 @@ private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutExcept } while (publishTimeout > 0); } catch (InterruptedException e) { addWarn("Logs publish to Kafka interrupted.", e); + } catch (TimeoutException e) { + buffer.addAll(cachedLogs); + throw e; } + buffer.addAll(cachedLogs); return 0; } diff --git a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java index f639f6e9..3c9ecfc1 100644 --- a/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java +++ b/twill-core/src/test/java/org/apache/twill/internal/ControllerTest.java @@ -212,7 +212,7 @@ public ResourceReport getResourceReport() { return null; } }; - controller.startAsync().awaitTerminated(); + controller.startAsync().awaitRunning(); return controller; } } diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java index b689b9a1..670f748e 100644 --- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java +++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java @@ -350,6 +350,9 @@ private static Properties generateKafkaConfig(String zkConnectStr, File logDir) // Use a really small file size to force some flush to happen prop.setProperty("log.file.size", "1024"); prop.setProperty("log.default.flush.interval.ms", "1000"); + prop.setProperty("log.cleaner.enable", "false"); + prop.setProperty("log.cleaner.threads", "1"); + prop.setProperty("log.cleaner.dedupe.buffer.size", "10485760"); return prop; } } diff --git a/twill-yarn/pom.xml b/twill-yarn/pom.xml index 5c886f1d..7454b0b4 100644 --- a/twill-yarn/pom.xml +++ b/twill-yarn/pom.xml @@ -84,6 +84,11 @@ junit junit + + org.mockito + mockito-core + test + org.ow2.asm asm-commons diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java index e6b4d4b1..da5bd24a 100644 --- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java +++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java @@ -86,7 +86,7 @@ protected void startUp() throws Exception { trackerAddr.getPort(), trackerUrl.toString()); maxCapability = response.getMaximumResourceCapability(); - nmClient.startAsync().awaitTerminated(); + nmClient.startAsync().awaitRunning(); } @Override diff --git a/twill-yarn/src/main/hadoop3/org/apache/twill/internal/yarn/Hadoop3YarnAMClient.java b/twill-yarn/src/main/hadoop3/org/apache/twill/internal/yarn/Hadoop3YarnAMClient.java new file mode 100644 index 00000000..c2994fd4 --- /dev/null +++ b/twill-yarn/src/main/hadoop3/org/apache/twill/internal/yarn/Hadoop3YarnAMClient.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal.yarn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Wrapper class for AMRMClient for Hadoop version 3.3.6 or greater. + */ +public final class Hadoop3YarnAMClient extends Hadoop22YarnAMClient { + + private static final Logger LOG = LoggerFactory.getLogger(Hadoop3YarnAMClient.class); + + public Hadoop3YarnAMClient(Configuration conf) { + super(conf); + } + + @Override + protected final ContainerId containerIdLookup(String containerIdStr) { + return (ContainerId.fromString(containerIdStr)); + } +} diff --git a/twill-yarn/src/main/hadoop3/org/apache/twill/internal/yarn/Hadoop3YarnAppClient.java b/twill-yarn/src/main/hadoop3/org/apache/twill/internal/yarn/Hadoop3YarnAppClient.java new file mode 100644 index 00000000..f23cfbb6 --- /dev/null +++ b/twill-yarn/src/main/hadoop3/org/apache/twill/internal/yarn/Hadoop3YarnAppClient.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.twill.internal.yarn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.twill.api.Configs; + +/** + *

+ * The service implementation of {@link YarnAppClient} for Apache Hadoop 2.6 and beyond. + * + * The {@link VersionDetectYarnAppClientFactory} class will decide to return instance of this class for + * Apache Hadoop 2.6 and beyond. + *

+ */ +@SuppressWarnings("unused") +public class Hadoop3YarnAppClient extends Hadoop26YarnAppClient { + + public Hadoop3YarnAppClient(Configuration configuration) { + super(configuration); + } + + @Override + protected void configureAppSubmissionContext(ApplicationSubmissionContext context) { + super.configureAppSubmissionContext(context); + long interval = configuration.getLong(Configs.Keys.YARN_ATTEMPT_FAILURES_VALIDITY_INTERVAL, -1L); + if (interval > 0) { + context.setAttemptFailuresValidityInterval(interval); + } + } +} diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java index 01e45dd2..a6335b2e 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java @@ -208,6 +208,9 @@ private Properties generateKafkaConfig(String kafkaZKConnect) { // Setting it to lower value allow the AM to retry multiple times if race happens. prop.setProperty("zookeeper.connection.timeout.ms", "3000"); prop.setProperty("default.replication.factor", "1"); + prop.setProperty("log.cleaner.enable", "false"); + prop.setProperty("log.cleaner.threads", "1"); + prop.setProperty("log.cleaner.dedupe.buffer.size", "10485760"); return prop; } } diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java index b6454b98..d54cd930 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java @@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -75,29 +74,32 @@ public enum HadoopVersions { HADOOP_26 } - private static boolean hasDFSUtilClient = false; // use this to judge if the hadoop version is above 2.8 - - private static boolean hasHAUtilsClient = false; - private static Method getHaNnRpcAddressesMethod; private static Method cloneDelegationTokenForLogicalUriMethod; static { try { - Class dfsUtilsClientClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtilClient"); + Class dfsUtilsClientClazz; + try { + dfsUtilsClientClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtilClient"); + } catch (ClassNotFoundException e) { + dfsUtilsClientClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil"); + } getHaNnRpcAddressesMethod = dfsUtilsClientClazz.getMethod("getHaNnRpcAddresses", Configuration.class); - hasDFSUtilClient = true; - Class haUtilClientClazz = Class.forName("org.apache.hadoop.hdfs.HAUtilClient"); + + Class haUtilClientClazz; + try { + haUtilClientClazz = Class.forName("org.apache.hadoop.hdfs.HAUtilClient"); + } catch (ClassNotFoundException e) { + haUtilClientClazz = Class.forName("org.apache.hadoop.hdfs.HAUtil"); + } cloneDelegationTokenForLogicalUriMethod = haUtilClientClazz.getMethod( "cloneDelegationTokenForLogicalUri", UserGroupInformation.class, URI.class, Collection.class); - hasHAUtilsClient = true; - } catch (ClassNotFoundException e) { - LOG.debug("No such class", e); - } catch (NoSuchMethodException e) { - LOG.debug("No such method", e); + } catch (ClassNotFoundException | NoSuchMethodException e) { + LOG.debug("No such class or method", e); } } @@ -210,11 +212,7 @@ public static void cloneHaNnCredentials(Configuration config) throws IOException */ private static void cloneDelegationTokenForLogicalUri(UserGroupInformation ugi, URI haUri, Collection nnAddrs) { - if (hasHAUtilsClient) { - invokeStaticMethodWithExceptionHandled(cloneDelegationTokenForLogicalUriMethod, ugi, haUri, nnAddrs); - } else { - HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nnAddrs); - } + invokeStaticMethodWithExceptionHandled(cloneDelegationTokenForLogicalUriMethod, ugi, haUri, nnAddrs); } @@ -223,13 +221,10 @@ private static void cloneDelegationTokenForLogicalUri(UserGroupInformation ugi, * @param config * @return */ + @SuppressWarnings("unchecked") private static Map> getHaNnRpcAddresses(Configuration config) { - return hasDFSUtilClient ? getHaNnRpcAddressesUseDFSUtilClient(config) : - DFSUtil.getHaNnRpcAddresses(config); - } - - private static Map> getHaNnRpcAddressesUseDFSUtilClient(Configuration config) { - return (Map) invokeStaticMethodWithExceptionHandled(getHaNnRpcAddressesMethod, config); + return (Map>) invokeStaticMethodWithExceptionHandled( + getHaNnRpcAddressesMethod, config); } private static Object invokeStaticMethodWithExceptionHandled(Method method, Object ... args) { diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java index cf62e99d..d99411a5 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java @@ -180,7 +180,11 @@ protected void shutDown() throws Exception { @Override public void start() { - serviceDelegate.startAsync().awaitRunning(); + if (serviceDelegate.state() == Service.State.NEW) { + serviceDelegate.startAsync().awaitRunning(); + } else { + serviceDelegate.awaitRunning(); + } } @Override diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java index ecc89bc1..b8514602 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java @@ -240,7 +240,6 @@ private ResourceReport waitForAfterRestartResourceReport(TwillController control TimeUnit timeoutUnit, int numOfResources, @Nullable Map instanceIdToContainerId) { Stopwatch stopwatch = Stopwatch.createStarted(); - stopwatch.start(); do { ResourceReport report = controller.getResourceReport(); if (report == null || report.getRunnableResources(runnable) == null) { diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java index c6c7c029..2b5a945c 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelChangeTestRun.java @@ -253,7 +253,6 @@ private void waitForLogLevel(TwillController controller, String runnable, long t int expectedInstances) throws InterruptedException { Stopwatch stopwatch = Stopwatch.createStarted(); - stopwatch.start(); while (stopwatch.elapsed(timeoutUnit) < timeout) { ResourceReport report = controller.getResourceReport(); diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java index 8706e83d..58d2c9f1 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java @@ -162,7 +162,6 @@ private boolean waitForLogLevel(TwillController controller, String runnable, lon TimeUnit timeoutUnit, @Nullable LogEntry.Level expected) throws InterruptedException { Stopwatch stopwatch = Stopwatch.createStarted(); - stopwatch.start(); do { ResourceReport report = controller.getResourceReport(); if (report == null || report.getRunnableResources(runnable) == null) { diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/MaxRetriesTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/MaxRetriesTestRun.java index f178c89c..bef98b6b 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/MaxRetriesTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/MaxRetriesTestRun.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.twill.discovery.ServiceDiscovered; /** * Unit tests for RunningContainers class. @@ -82,7 +83,6 @@ public void maxRetriesWithIncreasedInstances() throws InterruptedException, Exec final int maxRetries = 3; final AtomicInteger retriesSeen = new AtomicInteger(0); final CountDownLatch retriesExhausted = new CountDownLatch(1); - final CountDownLatch allRunning = new CountDownLatch(1); // start with 2 instances ResourceSpecification resource = ResourceSpecification.Builder.with().setVirtualCores(1) @@ -99,22 +99,20 @@ public void onLog(LogEntry logEntry) { if (logEntry.getMessage().contains("Retries exhausted")) { retriesExhausted.countDown(); } - if (logEntry.getMessage().contains("fully provisioned with 2 instances")) { - allRunning.countDown(); - } } }).start(); try { // wait for initial instances to have started - allRunning.await(); + ServiceDiscovered discovered = controller.discoverService("failingInstance"); + Assert.assertTrue(waitForSize(discovered, 2, 120)); /* * now increase the number of instances. these should fail since there instance ids are > 1. afterwards, the * number of retries should be 3 since only this one instance failed. */ controller.changeInstances(FailingInstanceServer.class.getSimpleName(), 3); - retriesExhausted.await(); + Assert.assertTrue(retriesExhausted.await(120, TimeUnit.SECONDS)); Assert.assertEquals(3, retriesSeen.get()); } finally { @@ -148,6 +146,7 @@ public void run() { throw new RuntimeException("FAIL early FAIL often"); } else { LOG.info("Instance {} is running", getContext().getInstanceId()); + getContext().announce("failingInstance", 12345); while (true) { try { Thread.sleep(100); diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/RestartRunnableTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/RestartRunnableTestRun.java index 48b574fa..172e9141 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/RestartRunnableTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/RestartRunnableTestRun.java @@ -285,7 +285,6 @@ public void testRestartRunnable() throws Exception { private void waitForContainers(TwillController controller, int count, long timeout, TimeUnit timeoutUnit) throws Exception { Stopwatch stopwatch = Stopwatch.createStarted(); - stopwatch.start(); int yarnContainers = 0; int twillContainers = 0; do { @@ -307,7 +306,6 @@ private void waitForContainers(TwillController controller, int count, long timeo private void waitForInstance(TwillController controller, String runnable, String yarnInstanceId, long timeout, TimeUnit timeoutUnit) throws InterruptedException, TimeoutException { Stopwatch stopwatch = Stopwatch.createStarted(); - stopwatch.start(); do { ResourceReport report = controller.getResourceReport(); if (report != null && report.getRunnableResources(runnable) != null) { diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java index e2def33e..f672c1a9 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/SessionExpireTestRun.java @@ -90,7 +90,6 @@ private boolean expireAppMasterZKSession(TwillController controller, long timeou QueryExp query = Query.isInstanceOf(new StringValueExp(ConnectionMXBean.class.getName())); Stopwatch stopwatch = Stopwatch.createStarted(); - stopwatch.start(); do { // Find the AM session and expire it Set connectionBeans = mbeanServer.queryNames(ObjectName.WILDCARD, query); diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java index 05a988e4..786dba1b 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java @@ -118,6 +118,8 @@ protected void before() throws Throwable { LOG.info("Starting Mini DFS on path {}", miniDFSDir); Configuration fsConf = new HdfsConfiguration(new Configuration()); fsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, miniDFSDir.getAbsolutePath()); + fsConf.set("dfs.namenode.resource.du.reserved", "0"); + fsConf.set("dfs.namenode.safemode.threshold-pct", "0.0f"); for (Map.Entry entry : extraConfig.entrySet()) { fsConf.set(entry.getKey(), entry.getValue()); @@ -136,6 +138,8 @@ protected void before() throws Throwable { conf.set("yarn.nodemanager.vmem-check-enabled", "false"); conf.set("yarn.scheduler.minimum-allocation-mb", "128"); conf.set("yarn.nodemanager.delete.debug-delay-sec", "3600"); + conf.setBoolean("yarn.nodemanager.disk-health-checker.enable", false); + conf.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", "100.0"); conf.set(Configs.Keys.LOCAL_STAGING_DIRECTORY, tmpFolder.newFolder().getAbsolutePath()); diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java index f50b7af7..d505ed63 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java @@ -492,14 +492,14 @@ public void process(WatchedEvent event) { LOG.info("ZooKeeper session expired: {}", zkStr); // When connection expired, simply reconnect again - if (state != State.RUNNING) { + if (state != State.RUNNING && state != State.STARTING) { return; } eventExecutor.submit(new Runnable() { @Override public void run() { - // Only reconnect if the current state is running - if (state() != State.RUNNING) { + // Only reconnect if the current state is running or starting + if (state() != State.RUNNING && state() != State.STARTING) { return; } try { diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java index dcc2c6e7..e636d53c 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/InMemoryZKServer.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.slf4j.Logger; @@ -55,9 +56,12 @@ protected void startUp() throws Exception { FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir); zkServer.setTxnLogFactory(ftxn); zkServer.setTickTime(tickTime); + zkServer.setMinSessionTimeout(-1); + zkServer.setMaxSessionTimeout(-1); + zkServer.setZKDatabase(new ZKDatabase(ftxn)); factory = ServerCnxnFactory.createFactory(); - factory.configure(getAddress(port), -1); + factory.configure(getAddress(port), 1024); factory.startup(zkServer); LOG.info("In memory ZK started: " + getConnectionStr()); @@ -102,7 +106,7 @@ private InMemoryZKServer(File dataDir, int tickTime, boolean autoClean, int port public String getConnectionStr() { InetSocketAddress addr = factory.getLocalAddress(); - return String.format("%s:%d", addr.getHostName(), addr.getPort()); + return String.format("%s:%d", addr.getAddress().getHostAddress(), addr.getPort()); } public InetSocketAddress getLocalAddress() { diff --git a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java index f98b8f69..34c08770 100644 --- a/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java +++ b/twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/SettableOperationFuture.java @@ -53,7 +53,11 @@ public void addListener(final Runnable listener, final Executor exec) { super.addListener(new Runnable() { @Override public void run() { - exec.execute(listener); + try { + exec.execute(listener); + } catch (java.util.concurrent.RejectedExecutionException e) { + // Executor is shut down, ignore + } } }, executor); } diff --git a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java index 53e4324a..edeb89ca 100644 --- a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java +++ b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java @@ -48,7 +48,7 @@ /** * Test for {@link LeaderElection}. */ -public class LeaderElectionTest { + public class LeaderElectionTest { private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionTest.class);