Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 70 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
<surefire.redirectTestOutputToFile>true</surefire.redirectTestOutputToFile>
<slf4j.version>1.7.30</slf4j.version>
<logback.version>1.2.11</logback.version>
<guava.version>13.0.1</guava.version>
<guava.version>32.0.0-jre</guava.version>
<gson.version>2.2.4</gson.version>
<findbugs.jsr305.version>2.0.1</findbugs.jsr305.version>
<netty.version>4.1.75.Final</netty.version>
Expand All @@ -94,6 +94,7 @@
<commons-compress.version>1.5</commons-compress.version>
<force.mac.tests>false</force.mac.tests>
<scala.version>2.12.10</scala.version>
<mockito.version>3.12.4</mockito.version>
</properties>

<build>
Expand Down Expand Up @@ -222,7 +223,7 @@
</configuration>
</plugin>
<!-- GPG signature -->
<plugin>
<!--<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
Expand All @@ -237,7 +238,7 @@
</goals>
</execution>
</executions>
</plugin>
</plugin>-->
</plugins>
</pluginManagement>
<plugins>
Expand Down Expand Up @@ -535,6 +536,40 @@
<properties>
<hadoop.version>2.6.5</hadoop.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.8</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/hadoop21</source>
<source>src/main/hadoop22</source>
<source>src/main/hadoop23</source>
<source>src/main/hadoop26</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop-3</id>
<properties>
<hadoop.version>3.3.6</hadoop.version>
<zookeeper.version>3.5.7</zookeeper.version>
<curator.version>5.2.0</curator.version>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
Expand All @@ -557,6 +592,7 @@
<source>src/main/hadoop22</source>
<source>src/main/hadoop23</source>
<source>src/main/hadoop26</source>
<source>src/main/hadoop3</source>
</sources>
</configuration>
</execution>
Expand Down Expand Up @@ -623,11 +659,31 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
Expand All @@ -638,6 +694,11 @@
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
Expand Down Expand Up @@ -899,6 +960,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.unitils</groupId>
<artifactId>unitils-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public final class Constants {
public static final String LOG_TOPIC = "log";

/** Maximum number of seconds for AM to start. */
public static final int APPLICATION_MAX_START_SECONDS = 60;
public static final int APPLICATION_MAX_START_SECONDS = 180;
/** Maximum number of seconds for AM to stop. */
public static final int APPLICATION_MAX_STOP_SECONDS = 60;
public static final int APPLICATION_MAX_STOP_SECONDS = 180;

public static final long PROVISION_TIMEOUT = 30000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
Expand Down Expand Up @@ -93,9 +94,10 @@ public Future<? extends ServiceController> terminate(long gracefulTimeout, TimeU
}

terminationTimeoutMillis.compareAndSet(-1L, timeout);
stop();
stopAsync();
return Futures.transform(terminationFuture,
(Function<State, ServiceController>) input -> AbstractExecutionServiceController.this);
(Function<State, ServiceController>) input -> AbstractExecutionServiceController.this,
MoreExecutors.directExecutor());
}

@Nullable
Expand Down Expand Up @@ -130,55 +132,72 @@ public void terminated(State from) {
}

@Override
public void awaitTerminated() throws ExecutionException {
Uninterruptibles.getUninterruptibly(terminationFuture);
public void awaitTerminated() {
try {
Uninterruptibles.getUninterruptibly(terminationFuture);
} catch (ExecutionException e) {
throw new IllegalStateException(e.getCause());
}
}

@Override
public void awaitTerminated(long timeout, TimeUnit timeoutUnit) throws TimeoutException, ExecutionException {
Uninterruptibles.getUninterruptibly(terminationFuture, timeout, timeoutUnit);
public void awaitTerminated(long timeout, TimeUnit timeoutUnit) throws TimeoutException {
try {
Uninterruptibles.getUninterruptibly(terminationFuture, timeout, timeoutUnit);
} catch (ExecutionException e) {
// Consuming ExecutionException to match interface or rethrow runtime
throw new IllegalStateException(e.getCause());
}
}

public final void addListener(Listener listener, Executor executor) {
listenerExecutors.addListener(new ListenerExecutor(listener, executor));
}

@Override
public final ListenableFuture<State> start() {
public final Service startAsync() {
serviceDelegate.addListener(listenerExecutors, Threads.SAME_THREAD_EXECUTOR);
return serviceDelegate.start();
return serviceDelegate.startAsync();
}

@Override
public final State startAndWait() {
return Futures.getUnchecked(start());
public final Service stopAsync() {
serviceDelegate.stopAsync();
return this;
}

@Override
public final boolean isRunning() {
return serviceDelegate.isRunning();
public final void awaitRunning() {
serviceDelegate.awaitRunning();
}

@Override
public final State state() {
return serviceDelegate.state();
public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
serviceDelegate.awaitRunning(timeout, unit);
}

@Override
public final State stopAndWait() {
return Futures.getUnchecked(stop());
public final Throwable failureCause() {
return serviceDelegate.failureCause();
}

@Override
public final ListenableFuture<State> stop() {
return serviceDelegate.stop();
public final boolean isRunning() {
return serviceDelegate.isRunning();
}

protected Executor executor(final State state) {
@Override
public final State state() {
return serviceDelegate.state();
}



protected Executor executor() {
return new Executor() {
@Override
public void execute(Runnable command) {
Thread t = new Thread(command, getClass().getSimpleName() + " " + state);
Thread t = new Thread(command, getClass().getSimpleName() + " " + state());
t.setDaemon(true);
t.start();
}
Expand Down Expand Up @@ -213,15 +232,15 @@ protected void shutDown() throws Exception {
}

@Override
protected Executor executor(State state) {
return AbstractExecutionServiceController.this.executor(state);
protected Executor executor() {
return AbstractExecutionServiceController.this.executor();
}
}

/**
* Inner class for dispatching listener call back to a list of listeners.
*/
private static final class ListenerExecutors implements Listener {
private static final class ListenerExecutors extends Listener {

private interface Callback {
void call(Listener listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.api.logging.LogThrowable;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.ServiceDiscovered;
import org.apache.twill.discovery.ZKDiscoveryService;
import org.apache.twill.internal.json.LogEntryDecoder;
Expand Down Expand Up @@ -102,7 +103,7 @@ public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, b
@Override
protected synchronized void doStartUp() {
if (kafkaClient != null && !logHandlers.isEmpty()) {
kafkaClient.startAndWait();
kafkaClient.startAsync().awaitRunning();
logCancellable = kafkaClient.getConsumer().prepare()
.addFromBeginning(Constants.LOG_TOPIC, 0)
.consume(new LogMessageCallback(logHandlers));
Expand All @@ -119,7 +120,7 @@ protected synchronized void doShutDown() {
}
if (kafkaClient != null) {
// Safe to call stop no matter what state the KafkaClientService is in.
kafkaClient.stopAndWait();
kafkaClient.stopAsync().awaitTerminated();
}
}

Expand All @@ -133,7 +134,7 @@ public final synchronized void addLogHandler(LogHandler handler) {

logHandlers.add(handler);
if (logHandlers.size() == 1) {
kafkaClient.startAndWait();
kafkaClient.startAsync().awaitRunning();
logCancellable = kafkaClient.getConsumer().prepare()
.addFromBeginning(Constants.LOG_TOPIC, 0)
.consume(new LogMessageCallback(logHandlers));
Expand Down Expand Up @@ -198,7 +199,7 @@ public ListenableFuture<String> restartInstances(final String runnable, Set<Inte
public String apply(Set<String> input) {
return runnable;
}
});
}, Threads.SAME_THREAD_EXECUTOR);
}

@Override
Expand Down
Loading