Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
<surefire.redirectTestOutputToFile>true</surefire.redirectTestOutputToFile>
<slf4j.version>1.7.30</slf4j.version>
<logback.version>1.2.11</logback.version>
<guava.version>13.0.1</guava.version>
<guava.version>32.0.0-jre</guava.version>
<gson.version>2.2.4</gson.version>
<findbugs.jsr305.version>2.0.1</findbugs.jsr305.version>
<netty.version>4.1.75.Final</netty.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.twill.api.RunId;
import org.apache.twill.api.ServiceController;
import org.apache.twill.common.Threads;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -93,9 +92,10 @@ public Future<? extends ServiceController> terminate(long gracefulTimeout, TimeU
}

terminationTimeoutMillis.compareAndSet(-1L, timeout);
stop();
stopAsync();
return Futures.transform(terminationFuture,
(Function<State, ServiceController>) input -> AbstractExecutionServiceController.this);
(Function<State, ServiceController>) input -> AbstractExecutionServiceController.this,
MoreExecutors.directExecutor());
}

@Nullable
Expand Down Expand Up @@ -129,29 +129,25 @@ public void terminated(State from) {
}, executor);
}

@Override
public void awaitTerminated() throws ExecutionException {
Uninterruptibles.getUninterruptibly(terminationFuture);
}

@Override
public void awaitTerminated(long timeout, TimeUnit timeoutUnit) throws TimeoutException, ExecutionException {
Uninterruptibles.getUninterruptibly(terminationFuture, timeout, timeoutUnit);
}

public final void addListener(Listener listener, Executor executor) {
listenerExecutors.addListener(new ListenerExecutor(listener, executor));
}

@Override
public final ListenableFuture<State> start() {
public final Service startAsync() {
serviceDelegate.addListener(listenerExecutors, Threads.SAME_THREAD_EXECUTOR);
return serviceDelegate.start();
serviceDelegate.startAsync();
return this;
}

@Override
public final State startAndWait() {
return Futures.getUnchecked(start());
public final void awaitRunning() {
serviceDelegate.awaitRunning();
}

@Override
public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
serviceDelegate.awaitRunning(timeout, unit);
}

@Override
Expand All @@ -165,13 +161,24 @@ public final State state() {
}

@Override
public final State stopAndWait() {
return Futures.getUnchecked(stop());
public final Service stopAsync() {
serviceDelegate.stopAsync();
return this;
}

@Override
public final void awaitTerminated() {
serviceDelegate.awaitTerminated();
}

@Override
public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
serviceDelegate.awaitTerminated(timeout, unit);
}

@Override
public final ListenableFuture<State> stop() {
return serviceDelegate.stop();
public final Throwable failureCause() {
return serviceDelegate.failureCause();
}

protected Executor executor(final State state) {
Expand Down Expand Up @@ -213,15 +220,15 @@ protected void shutDown() throws Exception {
}

@Override
protected Executor executor(State state) {
return AbstractExecutionServiceController.this.executor(state);
protected Executor executor() {
return AbstractExecutionServiceController.this.executor(state());
}
}

/**
* Inner class for dispatching listener call back to a list of listeners.
*/
private static final class ListenerExecutors implements Listener {
private static final class ListenerExecutors extends Listener {

private interface Callback {
void call(Listener listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.twill.internal;

import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
Expand All @@ -27,6 +26,7 @@
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.twill.api.Command;
Expand Down Expand Up @@ -54,6 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -102,7 +103,7 @@ public AbstractTwillController(String appName, RunId runId, ZKClient zkClient, b
@Override
protected synchronized void doStartUp() {
if (kafkaClient != null && !logHandlers.isEmpty()) {
kafkaClient.startAndWait();
kafkaClient.startAsync().awaitRunning();
logCancellable = kafkaClient.getConsumer().prepare()
.addFromBeginning(Constants.LOG_TOPIC, 0)
.consume(new LogMessageCallback(logHandlers));
Expand All @@ -119,7 +120,7 @@ protected synchronized void doShutDown() {
}
if (kafkaClient != null) {
// Safe to call stop no matter what state the KafkaClientService is in.
kafkaClient.stopAndWait();
kafkaClient.stopAsync().awaitTerminated();
}
}

Expand All @@ -133,7 +134,7 @@ public final synchronized void addLogHandler(LogHandler handler) {

logHandlers.add(handler);
if (logHandlers.size() == 1) {
kafkaClient.startAndWait();
kafkaClient.startAsync().awaitRunning();
logCancellable = kafkaClient.getConsumer().prepare()
.addFromBeginning(Constants.LOG_TOPIC, 0)
.consume(new LogMessageCallback(logHandlers));
Expand Down Expand Up @@ -198,7 +199,7 @@ public ListenableFuture<String> restartInstances(final String runnable, Set<Inte
public String apply(Set<String> input) {
return runnable;
}
});
}, MoreExecutors.directExecutor());
}

@Override
Expand Down Expand Up @@ -280,7 +281,7 @@ public long onReceived(Iterator<FetchedMessage> messages) {
long nextOffset = -1L;
while (messages.hasNext()) {
FetchedMessage message = messages.next();
String json = Charsets.UTF_8.decode(message.getPayload()).toString();
String json = StandardCharsets.UTF_8.decode(message.getPayload()).toString();
try {
LogEntry entry = GSON.fromJson(json, LogEntry.class);
if (entry != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.twill.internal;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.FutureCallback;
Expand Down Expand Up @@ -46,6 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -144,7 +144,7 @@ protected Gson getLiveNodeGson() {
@Override
public ListenableFuture<String> onReceived(String messageId, Message message) {
LOG.info("Message received: {}", message);
return Futures.immediateCheckedFuture(messageId);
return Futures.immediateFuture(messageId);
}

@Override
Expand All @@ -164,10 +164,10 @@ protected final void startUp() throws Exception {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.Expired) {
LOG.warn("ZK Session expired for service {} with runId {}.", getServiceName(), runId.getId());
LOG.warn("ZK Session expired for service {} with runId {}.", serviceName(), runId.getId());
expired = true;
} else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
LOG.info("Reconnected after expiration for service {} with runId {}", getServiceName(), runId.getId());
LOG.info("Reconnected after expiration for service {} with runId {}", serviceName(), runId.getId());
expired = false;
logIfFailed(createLiveNode());
}
Expand Down Expand Up @@ -204,7 +204,7 @@ protected final void shutDown() throws Exception {
} finally {
// Given at most 5 seconds to cleanup ZK nodes
removeLiveNode().get(5, TimeUnit.SECONDS);
LOG.info("Service {} with runId {} shutdown completed", getServiceName(), runId.getId());
LOG.info("Service {} with runId {} shutdown completed", serviceName(), runId.getId());
}
}

Expand Down Expand Up @@ -277,6 +277,7 @@ public void onFailure(Throwable t) {
LOG.error("Failed to watch messages.", t);
}
}, Threads.SAME_THREAD_EXECUTOR);

}

private void processMessage(final String path, final String messageId) {
Expand All @@ -292,7 +293,8 @@ public void onSuccess(NodeData result) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Message received from {}: {}", path, new String(MessageCodec.encode(message), Charsets.UTF_8));
LOG.debug("Message received from {}: {}",
path, new String(MessageCodec.encode(message), StandardCharsets.UTF_8));
}

// Handle the stop message
Expand Down Expand Up @@ -328,18 +330,19 @@ private boolean handleStopMessage(Message message, final Runnable messageRemover
terminationTimeoutMillis.compareAndSet(-1L, timeoutMillis);

// Stop this service.
Futures.addCallback(stop(), new FutureCallback<State>() {
addListener(new ServiceListenerAdapter() {
@Override
public void onSuccess(State result) {
public void terminated(State from) {
messageRemover.run();
}

@Override
public void onFailure(Throwable t) {
LOG.error("Stop service failed upon STOP command", t);
public void failed(State from, Throwable failure) {
LOG.error("Stop service failed upon STOP command", failure);
messageRemover.run();
}
}, Threads.SAME_THREAD_EXECUTOR);
stopAsync();
return true;
}

Expand Down Expand Up @@ -391,7 +394,7 @@ public void onSuccess(T result) {

@Override
public void onFailure(Throwable t) {
LOG.error("Operation failed for service {} with runId {}", getServiceName(), runId, t);
LOG.error("Operation failed for service {} with runId {}", serviceName(), runId, t);
}
}, Threads.SAME_THREAD_EXECUTOR);
}
Expand All @@ -410,6 +413,6 @@ private byte[] serializeLiveNode() {
if (liveNodeData != null) {
content.add("data", getLiveNodeGson().toJsonTree(liveNodeData));
}
return GSON.toJson(content).getBytes(Charsets.UTF_8);
return GSON.toJson(content).getBytes(StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ protected synchronized void forceShutDown() {
// In force shutdown, don't send message.
stopMessageFuture = Futures.immediateFuture(State.TERMINATED);
}
stop();
stopAsync();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.UncheckedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -53,9 +52,9 @@ protected void startUp() throws Exception {

for (Service service : services) {
try {
service.startAndWait();
} catch (UncheckedExecutionException e) {
failureCause = e.getCause();
service.startAsync().awaitRunning();
} catch (IllegalStateException e) {
failureCause = e.getCause() != null ? e.getCause() : e;
break;
}
}
Expand Down Expand Up @@ -88,12 +87,12 @@ private void stopAll() throws Exception {
Service service = itor.next();
try {
if (service.isRunning() || service.state() == State.STARTING) {
service.stopAndWait();
service.stopAsync().awaitTerminated();
}
} catch (UncheckedExecutionException e) {
} catch (IllegalStateException e) {
// Just catch as we want all services stopped
if (failureCause == null) {
failureCause = e.getCause();
failureCause = e.getCause() != null ? e.getCause() : e;
} else {
// Log for sub-sequence service shutdown error, as only the first failure cause will be thrown.
LOG.warn("Failed to stop service {}", service, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ElectionRegistry(ZKClient zkClient) {
*/
public Cancellable register(String name, ElectionHandler handler) {
LeaderElection election = new LeaderElection(zkClient, name, handler);
election.start();
election.startAsync();
registry.put(name, election);
return new CancellableElection(name, election);
}
Expand All @@ -56,7 +56,7 @@ public Cancellable register(String name, ElectionHandler handler) {
*/
public void shutdown() {
for (LeaderElection election : registry.values()) {
election.stop();
election.stopAsync();
}
}

Expand All @@ -71,7 +71,7 @@ public CancellableElection(String name, LeaderElection election) {

@Override
public void cancel() {
election.stop();
election.stopAsync();
registry.remove(name, election);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* Wrapper for {@link Service.Listener} to have callback executed on a given {@link Executor}.
* Also make sure each method is called at most once.
*/
final class ListenerExecutor implements Service.Listener {
final class ListenerExecutor extends Service.Listener {

private static final Logger LOG = LoggerFactory.getLogger(ListenerExecutor.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* An adapter for implementing {@link Service.Listener} with all method default to no-op.
*/
public abstract class ServiceListenerAdapter implements Service.Listener {
public abstract class ServiceListenerAdapter extends Service.Listener {
@Override
public void starting() {
// No-op
Expand Down
Loading