Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cdap-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>test-jar</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,54 @@ public static void startAndWait(Service service, long timeout, TimeUnit timeoutU
throws TimeoutException, InterruptedException, ExecutionException {
startAndWait(service, timeout, timeoutUnit, null);
}

/**
* Starts a service and waits for it to be running, using reflection
* to be compatible with both Guava 13 and Guava 15+ / 20+.
*/
public static void startAndWait(Service service) {
try {
try {
// Guava 15+
service.getClass().getMethod("startAsync").invoke(service);
service.getClass().getMethod("awaitRunning").invoke(service);
} catch (NoSuchMethodException e) {
// Guava 13
Object future = service.getClass().getMethod("start").invoke(service);
if (future instanceof ListenableFuture) {
((ListenableFuture<?>) future).get();
}
}
} catch (Exception e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
}
}

/**
* Stops a service and waits for it to be terminated, using reflection
* to be compatible with both Guava 13 and Guava 15+ / 20+.
*/
public static void stopAndWait(Service service) {
try {
try {
// Guava 15+
service.getClass().getMethod("stopAsync").invoke(service);
service.getClass().getMethod("awaitTerminated").invoke(service);
} catch (NoSuchMethodException e) {
// Guava 13
Object future = service.getClass().getMethod("stop").invoke(service);
if (future instanceof ListenableFuture) {
((ListenableFuture<?>) future).get();
}
}
} catch (Exception e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new RuntimeException(e.getCause() != null ? e.getCause() : e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package io.cdap.cdap.gateway.router;

import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
Expand Down Expand Up @@ -141,7 +141,7 @@ public Optional<InetSocketAddress> getBoundAddress() {
protected void startUp() throws Exception {
// If internal authorization enforcement is enabled, we avoid re-initialization of the token manager.
if (SecurityUtil.isManagedSecurity(cConf) && !SecurityUtil.isInternalAuthEnabled(cConf)) {
tokenValidator.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(tokenValidator);
}
ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
serverCancellable = startServer(createServerBootstrap(channelGroup), channelGroup);
Expand All @@ -157,14 +157,14 @@ protected void shutDown() {
serverCancellable.cancel();
// If internal authorization enforcement is enabled, we avoid duplicate cleanup of the token manager.
if (SecurityUtil.isManagedSecurity(cConf) && !SecurityUtil.isInternalAuthEnabled(cConf)) {
tokenValidator.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(tokenValidator);

}

LOG.info("Stopped Netty Router.");
}

@Override
protected Executor executor(final State state) {
protected Executor executor() {
final AtomicInteger id = new AtomicInteger();
return runnable -> {
Thread t = new Thread(runnable, String.format("NettyRouter-%d", id.incrementAndGet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void init(String[] args) {
LOG.info("Router initialized.");
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
throw Throwables.propagate(t);
throw new RuntimeException(t);
}
}

Expand All @@ -116,7 +116,7 @@ public void start() throws Exception {
+ "ZooKeeper quorum settings are correct in "
+ "cdap-site.xml. Currently configured as: %s",
zkClientService.getConnectString()));
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);
LOG.info("Router started.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.cdap.cdap.gateway;

import com.google.common.io.Closeables;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
Expand Down Expand Up @@ -179,38 +178,38 @@ protected void configure() {

messagingService = injector.getInstance(MessagingService.class);
if (messagingService instanceof Service) {
((Service) messagingService).startAndWait();
io.cdap.cdap.common.service.Services.startAndWait((Service) messagingService);
}
txService = injector.getInstance(TransactionManager.class);
txService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(txService);
// Define all StructuredTable before starting any services that need StructuredTable
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));
metadataStorage = injector.getInstance(MetadataStorage.class);
metadataStorage.createIndex();
metadataService = injector.getInstance(MetadataService.class);
metadataService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(metadataService);

dsOpService = injector.getInstance(DatasetOpExecutorService.class);
dsOpService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(dsOpService);
datasetService = injector.getInstance(DatasetService.class);
datasetService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(datasetService);
appFabricServer = injector.getInstance(AppFabricServer.class);
appFabricServer.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(appFabricServer);
appFabricProcessorService = injector.getInstance(AppFabricProcessorService.class);
appFabricProcessorService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(appFabricProcessorService);
logQueryService = injector.getInstance(LogQueryService.class);
logQueryService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(logQueryService);
metricsQueryService = injector.getInstance(MetricsQueryService.class);
metricsQueryService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(metricsQueryService);
metricsCollectionService = injector.getInstance(MetricsCollectionService.class);
metricsCollectionService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(metricsCollectionService);
namespaceAdmin = injector.getInstance(NamespaceAdmin.class);
namespaceAdmin.create(TEST_NAMESPACE_META1);
namespaceAdmin.create(TEST_NAMESPACE_META2);

// Restart handlers to check if they are resilient across restarts.
router = injector.getInstance(NettyRouter.class);
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);
port = router.getBoundAddress().orElseThrow(IllegalStateException::new).getPort();

return injector;
Expand All @@ -220,19 +219,25 @@ public static void stopGateway(CConfiguration conf) throws Exception {
namespaceAdmin.delete(new NamespaceId(TEST_NAMESPACE1));
namespaceAdmin.delete(new NamespaceId(TEST_NAMESPACE2));
namespaceAdmin.delete(NamespaceId.DEFAULT);
appFabricServer.stopAndWait();
appFabricProcessorService.stopAndWait();
metricsCollectionService.stopAndWait();
metricsQueryService.stopAndWait();
logQueryService.stopAndWait();
router.stopAndWait();
datasetService.stopAndWait();
dsOpService.stopAndWait();
metadataService.stopAndWait();
Closeables.closeQuietly(metadataStorage);
txService.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(appFabricServer);
io.cdap.cdap.common.service.Services.stopAndWait(appFabricProcessorService);
io.cdap.cdap.common.service.Services.stopAndWait(metricsCollectionService);
io.cdap.cdap.common.service.Services.stopAndWait(metricsQueryService);
io.cdap.cdap.common.service.Services.stopAndWait(logQueryService);
io.cdap.cdap.common.service.Services.stopAndWait(router);
io.cdap.cdap.common.service.Services.stopAndWait(datasetService);
io.cdap.cdap.common.service.Services.stopAndWait(dsOpService);
io.cdap.cdap.common.service.Services.stopAndWait(metadataService);
try {

metadataStorage.close();

} catch (Exception ignored) {

}
io.cdap.cdap.common.service.Services.stopAndWait(txService);
if (messagingService instanceof Service) {
((Service) messagingService).stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait((Service) messagingService);
}
conf.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,30 +149,29 @@ protected void configure() {
}));

transactionManager = injector.getInstance(TransactionManager.class);
transactionManager.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(transactionManager);
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));
dsOpService = injector.getInstance(DatasetOpExecutorService.class);
dsOpService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(dsOpService);

datasetService = injector.getInstance(DatasetService.class);
datasetService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(datasetService);

logQueryService = injector.getInstance(LogQueryService.class);
logQueryService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(logQueryService);

mockLogReader = (MockLogReader) injector.getInstance(LogReader.class);
mockLogReader = (MockLogReader) injector.getInstance(LogReader.class);
mockLogReader.generateLogs();

discoveryServiceClient = injector.getInstance(DiscoveryServiceClient.class);
}

@AfterClass
public static void tearDown() {
logQueryService.stopAndWait();

datasetService.stopAndWait();
dsOpService.stopAndWait();
transactionManager.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(logQueryService);
io.cdap.cdap.common.service.Services.stopAndWait(datasetService);
io.cdap.cdap.common.service.Services.stopAndWait(dsOpService);
io.cdap.cdap.common.service.Services.stopAndWait(transactionManager);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,20 @@ protected void configure() {
}));

transactionManager = injector.getInstance(TransactionManager.class);
transactionManager.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(transactionManager);
StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class));

dsOpService = injector.getInstance(DatasetOpExecutorService.class);
dsOpService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(dsOpService);

datasetService = injector.getInstance(DatasetService.class);
datasetService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(datasetService);

metrics = injector.getInstance(MetricsQueryService.class);
metrics.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(metrics);

collectionService = injector.getInstance(MetricsCollectionService.class);
collectionService.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(collectionService);

// initialize the dataset instantiator
DiscoveryServiceClient discoveryClient = injector.getInstance(DiscoveryServiceClient.class);
Expand All @@ -202,11 +202,11 @@ protected void configure() {
}

public static void stopMetricsService(CConfiguration conf) {
collectionService.stopAndWait();
datasetService.stopAndWait();
dsOpService.stopAndWait();
transactionManager.stopAndWait();
metrics.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(collectionService);
io.cdap.cdap.common.service.Services.stopAndWait(datasetService);
io.cdap.cdap.common.service.Services.stopAndWait(dsOpService);
io.cdap.cdap.common.service.Services.stopAndWait(transactionManager);
io.cdap.cdap.common.service.Services.stopAndWait(metrics);
conf.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public static void init() throws Exception {
successValidator,
new MockAccessTokenIdentityExtractor(successValidator), discoveryService,
new NoOpAeadCipher());
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);

httpService = NettyHttpService.builder("test").setHttpHandlers(new TestHandler()).build();
httpService.start();
Expand All @@ -119,7 +119,7 @@ public static void init() throws Exception {
public static void finish() throws Exception {
cancelDiscovery.cancel();
httpService.stop();
router.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(router);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ protected void startUp() {
new RouterServiceLookup(cConf, (DiscoveryServiceClient) discoveryService,
new RouterPathLookup()),
validator, userIdentityExtractor, discoveryServiceClient, new NoOpAeadCipher());
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);
}

@Override
protected void shutDown() {
router.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(router);
}

InetSocketAddress getRouterAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void init() throws Exception {
successValidator,
new MockAccessTokenIdentityExtractor(successValidator), discoveryService,
new NoOpAeadCipher());
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);

httpService = NettyHttpService.builder("test").setHttpHandlers(new AuditLogTest.TestHandler())
.build();
Expand Down Expand Up @@ -121,7 +121,7 @@ public void testRouterStatus() throws Exception {
public static void finish() throws Exception {
cancelDiscovery.cancel();
httpService.stop();
router.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(router);
}

private void testGet(int expectedStatus, String expectedResponse, String path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ protected void startUp() {
new RouterPathLookup()),
new SuccessTokenValidator(), userIdentityExtractor, discoveryServiceClient,
new NoOpAeadCipher());
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);
}

@Override
protected void shutDown() {
router.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(router);
}

public InetSocketAddress getRouterAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ protected void startUp() {
new RouterPathLookup()),
new SuccessTokenValidator(), userIdentityExtractor, discoveryServiceClient,
new NoOpAeadCipher());
router.startAndWait();
io.cdap.cdap.common.service.Services.startAndWait(router);
}

@Override
protected void shutDown() {
router.stopAndWait();
io.cdap.cdap.common.service.Services.stopAndWait(router);
}

public InetSocketAddress getRouterAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private void deploy(int num) throws Exception {

LocationFactory lf = new LocalLocationFactory(TMP_FOLDER.newFolder());
Location programJar = AppJarHelper.createDeploymentJar(lf, DummyApp.class);
GATEWAY_SERVER.setExpectedJarBytes(ByteStreams.toByteArray(Locations.newInputSupplier(programJar)));
GATEWAY_SERVER.setExpectedJarBytes(ByteStreams.toByteArray(Locations.newInputSupplier(programJar).getInput()));

for (int i = 0; i < num; i++) {
LOG.info("Deploying {}/{}", i, num);
Expand All @@ -220,7 +220,7 @@ private void deploy(int num) throws Exception {
urlConn.setDoOutput(true);
urlConn.setDoInput(true);

ByteStreams.copy(Locations.newInputSupplier(programJar), urlConn.getOutputStream());
ByteStreams.copy(Locations.newInputSupplier(programJar).getInput(), urlConn.getOutputStream());
Assert.assertEquals(200, urlConn.getResponseCode());
urlConn.getInputStream().close();
urlConn.disconnect();
Expand Down
Loading