diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000000..be975570a20c --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "java.compile.nullAnalysis.mode": "disabled" +} \ No newline at end of file diff --git a/cdap-common/src/main/java/com/google/common/io/InputSupplier.java b/cdap-common/src/main/java/com/google/common/io/InputSupplier.java new file mode 100644 index 000000000000..5500bef7e440 --- /dev/null +++ b/cdap-common/src/main/java/com/google/common/io/InputSupplier.java @@ -0,0 +1,38 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.common.io; + +import java.io.IOException; + +/** + * Stub for the removed {@code com.google.common.io.InputSupplier} interface. + * This interface was removed in Guava 21 but is still referenced by the + * {@code common-http} library. This stub allows compilation to succeed. + * + * @param the type of input object supplied + */ +@FunctionalInterface +public interface InputSupplier { + + /** + * Returns an input object that can be used for reading. + * + * @return the input object + * @throws IOException if an I/O error occurs + */ + T getInput() throws IOException; +} diff --git a/cdap-common/src/main/java/com/google/common/io/OutputSupplier.java b/cdap-common/src/main/java/com/google/common/io/OutputSupplier.java new file mode 100644 index 000000000000..e06a643d8b7d --- /dev/null +++ b/cdap-common/src/main/java/com/google/common/io/OutputSupplier.java @@ -0,0 +1,38 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.common.io; + +import java.io.IOException; + +/** + * Stub for the removed {@code com.google.common.io.OutputSupplier} interface. + * This interface was removed in Guava 21 but is still referenced by the + * {@code twill} library. This stub allows compilation to succeed. + * + * @param the type of output object supplied + */ +@FunctionalInterface +public interface OutputSupplier { + + /** + * Returns an output object that can be used for writing. + * + * @return the output object + * @throws IOException if an I/O error occurs + */ + T getOutput() throws IOException; +} diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/HttpExceptionHandler.java b/cdap-common/src/main/java/io/cdap/cdap/common/HttpExceptionHandler.java index 5e345694fd8b..a688b88e8f9c 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/HttpExceptionHandler.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/HttpExceptionHandler.java @@ -16,7 +16,7 @@ package io.cdap.cdap.common; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; import com.google.common.base.Throwables; import io.cdap.cdap.api.common.HttpErrorStatusProvider; import io.cdap.cdap.api.service.ServiceUnavailableException; @@ -76,7 +76,7 @@ public void handle(Throwable t, HttpRequest request, HttpResponder responder) { // If it is not some known exception type, response with 500. LOG.error("Unexpected error: request={} {} user={}:", request.method().name(), request.getUri(), - Objects.firstNonNull(SecurityRequestContext.getUserId(), ""), t); + MoreObjects.firstNonNull(SecurityRequestContext.getUserId(), ""), t); responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, Throwables.getRootCause(t).getMessage()); } @@ -84,6 +84,6 @@ public void handle(Throwable t, HttpRequest request, HttpResponder responder) { private void logWithTrace(HttpRequest request, Throwable t) { LOG.trace("Error in handling request={} {} for user={}:", request.method().name(), request.getUri(), - Objects.firstNonNull(SecurityRequestContext.getUserId(), ""), t); + MoreObjects.firstNonNull(SecurityRequestContext.getUserId(), ""), t); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/app/MainClassLoader.java b/cdap-common/src/main/java/io/cdap/cdap/common/app/MainClassLoader.java index f5012269ab72..4a65226a3da3 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/app/MainClassLoader.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/app/MainClassLoader.java @@ -16,7 +16,6 @@ package io.cdap.cdap.common.app; -import com.google.common.base.Splitter; import com.google.common.base.Throwables; import io.cdap.cdap.api.dataset.Dataset; import io.cdap.cdap.common.dataset.DatasetClassRewriter; @@ -24,17 +23,13 @@ import io.cdap.cdap.common.lang.ClassPathResources; import io.cdap.cdap.common.lang.CombineClassLoader; import io.cdap.cdap.common.lang.FilterClassLoader; -import io.cdap.cdap.common.lang.GuavaClassRewriter; import io.cdap.cdap.common.lang.InterceptableClassLoader; import io.cdap.cdap.common.leveldb.LevelDBClassRewriter; import io.cdap.cdap.common.security.AuthEnforceRewriter; -import io.cdap.cdap.common.utils.DirUtils; import io.cdap.cdap.internal.asm.Classes; import java.io.ByteArrayInputStream; -import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; @@ -53,7 +48,6 @@ public class MainClassLoader extends InterceptableClassLoader { private static final String DATASET_CLASS_NAME = Dataset.class.getName(); - private final GuavaClassRewriter guavaClassRewriter; private final DatasetClassRewriter datasetRewriter; private final AuthEnforceRewriter authEnforceRewriter; private final LevelDBClassRewriter levelDBClassRewriter; @@ -137,7 +131,6 @@ public static MainClassLoader createFromContext(FilterClassLoader.Filter filter, */ public MainClassLoader(URL[] urls, ClassLoader parent) { super(urls, parent); - this.guavaClassRewriter = new GuavaClassRewriter(); this.datasetRewriter = new DatasetClassRewriter(); this.authEnforceRewriter = new AuthEnforceRewriter(); this.levelDBClassRewriter = new LevelDBClassRewriter(); @@ -150,7 +143,8 @@ protected boolean needIntercept(String className) { try { return isRewriteNeeded(className); } catch (IOException e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -159,10 +153,6 @@ protected boolean needIntercept(String className) { public byte[] rewriteClass(String className, InputStream input) throws IOException { byte[] rewrittenCode = null; - if (guavaClassRewriter.needRewrite(className)) { - rewrittenCode = guavaClassRewriter.rewriteClass(className, input); - } - if (isDatasetRewriteNeeded(className)) { rewrittenCode = datasetRewriter.rewriteClass(className, input); } @@ -179,8 +169,7 @@ public byte[] rewriteClass(String className, InputStream input) throws IOExcepti } private boolean isRewriteNeeded(String className) throws IOException { - return guavaClassRewriter.needRewrite(className) - || levelDBClassRewriter.needRewrite(className) + return levelDBClassRewriter.needRewrite(className) || isDatasetRewriteNeeded(className) || isAuthRewriteNeeded(className); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/guice/FileContextProvider.java b/cdap-common/src/main/java/io/cdap/cdap/common/guice/FileContextProvider.java index 456954618a13..2af2ece5d34d 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/guice/FileContextProvider.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/guice/FileContextProvider.java @@ -69,7 +69,8 @@ private UserGroupInformation createUGI() { return UserGroupInformation.createRemoteUser(hdfsUser); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/guice/KafkaClientModule.java b/cdap-common/src/main/java/io/cdap/cdap/common/guice/KafkaClientModule.java index a90f9d361a51..3a9cc5e76801 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/guice/KafkaClientModule.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/guice/KafkaClientModule.java @@ -17,8 +17,6 @@ package io.cdap.cdap.common.guice; import com.google.common.util.concurrent.AbstractIdleService; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; import com.google.inject.Inject; import com.google.inject.Injector; @@ -32,6 +30,7 @@ import io.cdap.cdap.common.conf.KafkaConstants; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.twill.common.Cancellable; import org.apache.twill.internal.kafka.client.ZKBrokerService; @@ -129,23 +128,90 @@ public ZKClientService get() { // The logic doesn't need to be sophisticated since it is a private binding and only used by the // wrapping KafkaClientService and BrokerService, which they will make sure no duplicate calls will be // made to the start/stop methods. - return new ForwardingZKClientService(zkClientService) { - @Override - public ListenableFuture start() { - if (startedCount.getAndIncrement() == 0) { - return super.start(); - } - return Futures.immediateFuture(State.RUNNING); - } + final ZKClientService delegate = zkClientService; + return new RefCountZKClientService(delegate, startedCount); + } + } - @Override - public ListenableFuture stop() { - if (startedCount.decrementAndGet() == 0) { - return super.stop(); - } - return Futures.immediateFuture(State.TERMINATED); - } - }; + /** + * A {@link ZKClientService} wrapper using simple reference counting for start/stop. + * This wrapper is necessary because the underlying {@link ZKClientService} instance is shared between + * {@link DefaultKafkaClientService} and {@link DefaultBrokerService}. Both services manage its lifecycle; + * reference counting ensures the ZK client is only started once and remains open until both services are stopped. + */ + private static final class RefCountZKClientService extends ForwardingZKClientService { + + private final ZKClientService delegate; + private final AtomicInteger startedCount; + + RefCountZKClientService(ZKClientService delegate, AtomicInteger startedCount) { + super(delegate); + this.delegate = delegate; + this.startedCount = startedCount; + } + + @Override + public Service startAsync() { + if (startedCount.getAndIncrement() == 0) { + delegate.startAsync(); + } + return this; + } + + @Override + public Service stopAsync() { + if (startedCount.decrementAndGet() == 0) { + delegate.stopAsync(); + } + return this; + } + + @Override + public Throwable failureCause() { + return delegate.failureCause(); + } + + @Override + public void awaitRunning() { + delegate.awaitRunning(); + } + + @Override + public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + delegate.awaitRunning(timeout, unit); + } + + @Override + public void awaitTerminated() { + // If the delegate was not actually stopped (ref count > 0), return immediately + if (startedCount.get() > 0) { + return; + } + delegate.awaitTerminated(); + } + + @Override + public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + // If the delegate was not actually stopped (ref count > 0), return immediately + if (startedCount.get() > 0) { + return; + } + delegate.awaitTerminated(timeout, unit); + } + + @Override + public State state() { + return delegate.state(); + } + + @Override + public boolean isRunning() { + return delegate.isRunning(); + } + + @Override + public void addListener(Listener listener, Executor executor) { + delegate.addListener(listener, executor); } } @@ -166,12 +232,12 @@ private abstract static class AbstractServiceWithZkClient ext @Override protected final void startUp() throws Exception { - zkClientService.startAndWait(); + zkClientService.startAsync().awaitRunning(); try { - delegate.startAndWait(); + delegate.startAsync().awaitRunning(); } catch (Exception e) { try { - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } catch (Exception se) { e.addSuppressed(se); } @@ -182,16 +248,16 @@ protected final void startUp() throws Exception { @Override protected final void shutDown() throws Exception { try { - delegate.stopAndWait(); + delegate.stopAsync().awaitTerminated(); } catch (Exception e) { try { - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } catch (Exception se) { e.addSuppressed(se); } throw e; } - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } protected T getDelegate() { diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/http/AbstractBodyConsumer.java b/cdap-common/src/main/java/io/cdap/cdap/common/http/AbstractBodyConsumer.java index 9543e33fa80c..c991b2140798 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/http/AbstractBodyConsumer.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/http/AbstractBodyConsumer.java @@ -17,7 +17,6 @@ package io.cdap.cdap.common.http; import com.google.common.base.Throwables; -import com.google.common.io.Closeables; import io.cdap.http.BodyConsumer; import io.cdap.http.HttpResponder; import io.netty.buffer.ByteBuf; @@ -51,7 +50,8 @@ public void chunk(ByteBuf request, HttpResponder responder) { } request.readBytes(output, request.readableBytes()); } catch (IOException e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -63,7 +63,8 @@ public final void finished(HttpResponder responder) { } onFinish(responder, file); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } finally { cleanup(); } @@ -74,7 +75,11 @@ public final void handleError(Throwable cause) { try { LOG.error("Failed to handle upload", cause); if (output != null) { - Closeables.closeQuietly(output); + try { + output.close(); + } catch (Exception ignored) { + // Ignored, as we are already in error handling mode and want to continue cleanup + } } onError(cause); // The netty-http framework will response with 500, no need to response in here. diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/http/CombineInputStream.java b/cdap-common/src/main/java/io/cdap/cdap/common/http/CombineInputStream.java index e0c0ddfd8809..fc834e9cbc0a 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/http/CombineInputStream.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/http/CombineInputStream.java @@ -16,7 +16,6 @@ package io.cdap.cdap.common.http; -import com.google.common.io.Closeables; import io.cdap.cdap.common.io.FileSeekableInputStream; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; @@ -82,7 +81,11 @@ public int available() throws IOException { @Override public void close() throws IOException { - Closeables.closeQuietly(bufferStream); + try { + bufferStream.close(); + } catch (Exception ignored) { + // Ignored to ensure spillStream is closed even if bufferStream.close() fails + } if (spillStream != null) { spillStream.close(); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/http/LocationBodyProducer.java b/cdap-common/src/main/java/io/cdap/cdap/common/http/LocationBodyProducer.java index 5e0447a43518..78232146db1b 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/http/LocationBodyProducer.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/http/LocationBodyProducer.java @@ -16,7 +16,6 @@ package io.cdap.cdap.common.http; -import com.google.common.io.Closeables; import io.cdap.http.BodyProducer; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -65,6 +64,10 @@ public void handleError(@Nullable Throwable throwable) { if (throwable != null) { LOG.warn("Error in sending location {}", location, throwable); } - Closeables.closeQuietly(inputStream); + try { + inputStream.close(); + } catch (Exception ignored) { + // Ignored during cleanup + } } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/http/SpillableBodyConsumer.java b/cdap-common/src/main/java/io/cdap/cdap/common/http/SpillableBodyConsumer.java index 10ee554be82f..bced329f9485 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/http/SpillableBodyConsumer.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/http/SpillableBodyConsumer.java @@ -16,8 +16,6 @@ package io.cdap.cdap.common.http; -import com.google.common.base.Throwables; -import com.google.common.io.Closeables; import io.cdap.http.BodyConsumer; import io.cdap.http.HttpResponder; import io.netty.buffer.ByteBuf; @@ -88,12 +86,20 @@ public void chunk(ByteBuf request, HttpResponder responder) { @Override public void finished(HttpResponder responder) { - Closeables.closeQuietly(outputStream); + try { + if (outputStream != null) { + outputStream.close(); + } + } catch (IOException ignored) { + // Ignored during cleanup + } try (InputStream is = new CombineInputStream(buffer, outputStream == null ? null : spillPath)) { processInput(is, responder); } catch (Exception e) { - Throwables.propagateIfPossible(e); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } throw new RuntimeException(String.format("Failed to process input from buffer%s", outputStream == null ? "" : " and spill path " + spillPath), e); } finally { @@ -103,7 +109,13 @@ public void finished(HttpResponder responder) { @Override public void handleError(Throwable cause) { - Closeables.closeQuietly(outputStream); + try { + if (outputStream != null) { + outputStream.close(); + } + } catch (IOException ignored) { + // Ignored during cleanup + } cleanup(); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/internal/guava/ClassPath.java b/cdap-common/src/main/java/io/cdap/cdap/common/internal/guava/ClassPath.java index 2f7d0e9c1fd9..346724726a33 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/internal/guava/ClassPath.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/internal/guava/ClassPath.java @@ -328,7 +328,7 @@ public String getSimpleName() { String innerClassName = className.substring(lastDollarSign + 1); // local and anonymous classes are prefixed with number (1,2,3...), anonymous classes are // entirely numeric whereas local classes have the user supplied name as a suffix - return CharMatcher.DIGIT.trimLeadingFrom(innerClassName); + return CharMatcher.digit().trimLeadingFrom(innerClassName); } String packageName = getPackageName(); if (packageName.isEmpty()) { diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/io/DFSSeekableInputStream.java b/cdap-common/src/main/java/io/cdap/cdap/common/io/DFSSeekableInputStream.java index 16ca5b46db1c..9e6575539a4a 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/io/DFSSeekableInputStream.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/io/DFSSeekableInputStream.java @@ -16,12 +16,10 @@ package io.cdap.cdap.common.io; -import com.google.common.io.Closeables; import java.io.Closeable; import java.io.IOException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Seekable; -import org.apache.twill.filesystem.Location; /** * Implementation of {@link SeekableInputStream} for {@link Location}. @@ -69,7 +67,11 @@ public void close() throws IOException { super.close(); } finally { if (sizeProvider instanceof Closeable) { - Closeables.closeQuietly((Closeable) sizeProvider); + try { + ((Closeable) sizeProvider).close(); + } catch (Exception ignored) { + // Ignored during cleanup + } } } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/io/DefaultCachingPathProvider.java b/cdap-common/src/main/java/io/cdap/cdap/common/io/DefaultCachingPathProvider.java index 15e0c8988c43..c36b99e3de6f 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/io/DefaultCachingPathProvider.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/io/DefaultCachingPathProvider.java @@ -25,6 +25,7 @@ import io.cdap.cdap.common.utils.DirUtils; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -123,7 +124,7 @@ void clearCache(String fileName, long lastModified) { } String getCacheName(Location location) { - return Hashing.md5().hashString(location.toURI().getPath()).toString() + "-" + return Hashing.md5().hashString(location.toURI().getPath(), StandardCharsets.UTF_8).toString() + "-" + location.getName(); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/io/InputSupplier.java b/cdap-common/src/main/java/io/cdap/cdap/common/io/InputSupplier.java new file mode 100644 index 000000000000..1ff1f6b18693 --- /dev/null +++ b/cdap-common/src/main/java/io/cdap/cdap/common/io/InputSupplier.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.cdap.common.io; + +import java.io.IOException; + +/** + * A replacement for the removed {@code com.google.common.io.InputSupplier}. + * Supplies an input stream of type {@code T}. + * + * @param the type of input object supplied + */ +@FunctionalInterface +public interface InputSupplier { + + /** + * Returns an input object that can be used for reading. + * + * @return the input object + * @throws IOException if an I/O error occurs + */ + T getInput() throws IOException; +} diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/io/Locations.java b/cdap-common/src/main/java/io/cdap/cdap/common/io/Locations.java index 4eb04253b402..eb227edf175b 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/io/Locations.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/io/Locations.java @@ -20,9 +20,6 @@ import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.io.ByteStreams; -import com.google.common.io.Closeables; -import com.google.common.io.InputSupplier; -import com.google.common.io.OutputSupplier; import io.cdap.cdap.common.lang.FunctionWithException; import io.cdap.cdap.common.lang.jar.BundleJarUtil; import io.cdap.cdap.common.utils.DirUtils; @@ -115,8 +112,14 @@ public SeekableInputStream getInput() throws IOException { return new DFSSeekableInputStream(input, createDFSStreamSizeProvider(fs, false, path, input)); } catch (Throwable t) { - Closeables.closeQuietly(input); - Throwables.propagateIfInstanceOf(t, IOException.class); + try { + input.close(); + } catch (Exception ignored) { + // Ignored during cleanup + } + if (t instanceof IOException) { + throw (IOException) t; + } throw new IOException(t); } } @@ -169,8 +172,14 @@ public SeekableInputStream run() throws IOException { throw new IOException("Failed to create SeekableInputStream from location " + location); } catch (Throwable t) { - Closeables.closeQuietly(input); - Throwables.propagateIfInstanceOf(t, IOException.class); + try { + input.close(); + } catch (Exception ignored) { + // Ignored during cleanup + } + if (t instanceof IOException) { + throw (IOException) t; + } throw new IOException(t); } } @@ -343,7 +352,9 @@ private static void expandTarStream(TarArchiveInputStream tis, File targetDir) DirUtils.mkdirs(output); } else { DirUtils.mkdirs(output.getParentFile()); - ByteStreams.copy(tis, com.google.common.io.Files.newOutputStreamSupplier(output)); + try (OutputStream os = java.nio.file.Files.newOutputStream(output.toPath())) { + ByteStreams.copy(tis, os); + } } entry = tis.getNextTarEntry(); } @@ -484,7 +495,8 @@ public static Location getLocationFromAbsolutePath(LocationFactory locationFacto return locationFactory.create(uri); } catch (URISyntaxException e) { // Should not happen. - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -570,7 +582,8 @@ public Method get() { } return getFileLengthMethod; } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } }); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/io/OutputSupplier.java b/cdap-common/src/main/java/io/cdap/cdap/common/io/OutputSupplier.java new file mode 100644 index 000000000000..52062bec9517 --- /dev/null +++ b/cdap-common/src/main/java/io/cdap/cdap/common/io/OutputSupplier.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.cdap.common.io; + +import java.io.IOException; + +/** + * A replacement for the removed {@code com.google.common.io.OutputSupplier}. + * Supplies an output stream of type {@code T}. + * + * @param the type of output object supplied + */ +@FunctionalInterface +public interface OutputSupplier { + + /** + * Returns an output object that can be used for writing. + * + * @return the output object + * @throws IOException if an I/O error occurs + */ + T getOutput() throws IOException; +} diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/lang/ClassLoaders.java b/cdap-common/src/main/java/io/cdap/cdap/common/lang/ClassLoaders.java index dfc46ccabd46..830dbeadd3e0 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/lang/ClassLoaders.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/lang/ClassLoaders.java @@ -16,7 +16,7 @@ package io.cdap.cdap.common.lang; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; import com.google.common.base.Splitter; import com.google.common.base.Throwables; import java.io.File; @@ -67,7 +67,7 @@ private ClassLoaders() { */ public static Class loadClass(String className, @Nullable ClassLoader classLoader, Object caller) throws ClassNotFoundException { - ClassLoader cl = Objects.firstNonNull(classLoader, caller.getClass().getClassLoader()); + ClassLoader cl = MoreObjects.firstNonNull(classLoader, caller.getClass().getClassLoader()); return cl.loadClass(className); } @@ -283,7 +283,8 @@ public static URL getClassPathURL(String className, URL classUrl) { return URI.create(path.substring(0, path.indexOf("!/"))).toURL(); } } catch (MalformedURLException e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } throw new IllegalStateException("Unsupported class URL: " + classUrl); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/lang/DirectoryClassLoader.java b/cdap-common/src/main/java/io/cdap/cdap/common/lang/DirectoryClassLoader.java index 01f2299b37b9..4ce3f69e3801 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/lang/DirectoryClassLoader.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/lang/DirectoryClassLoader.java @@ -135,7 +135,8 @@ private static URL[] getClassPathURLs(File dir, @Nullable String extraClassPath, } catch (MalformedURLException e) { // Should never happen LOG.error("Error in adding jar URLs to classPathUrls", e); - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/lang/GuavaClassRewriter.java b/cdap-common/src/main/java/io/cdap/cdap/common/lang/GuavaClassRewriter.java deleted file mode 100644 index 87a848708337..000000000000 --- a/cdap-common/src/main/java/io/cdap/cdap/common/lang/GuavaClassRewriter.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Copyright © 2020 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.common.lang; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.MoreExecutors; -import io.cdap.cdap.internal.asm.Methods; -import java.io.IOException; -import java.io.InputStream; -import java.lang.invoke.LambdaMetafactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Executor; -import javax.annotation.Nullable; -import org.objectweb.asm.ClassReader; -import org.objectweb.asm.ClassVisitor; -import org.objectweb.asm.ClassWriter; -import org.objectweb.asm.Handle; -import org.objectweb.asm.MethodVisitor; -import org.objectweb.asm.Opcodes; -import org.objectweb.asm.Type; -import org.objectweb.asm.commons.GeneratorAdapter; -import org.objectweb.asm.commons.Method; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A {@link ClassRewriter} for rewriting Guava library classes to add missing functions that are - * available in later Guava library. Those new methods are being used by Hadoop 3 until 3.4 - * (HADOOP-17288). - * - * In particular, in the Preconditions class, there are new checkState and checkArgument methods - * that take different number of arguments for error message formatting purpose, instead of just the - * generic vararg method. Also, in the MoreExecutors class, there is a new directExecutor() method - * that replace the functionality of the the sameThreadExecutor(). - */ -public class GuavaClassRewriter implements ClassRewriter { - - private static final Logger LOG = LoggerFactory.getLogger(GuavaClassRewriter.class); - private static final String PRECONDTIONS_CLASS_NAME = "com.google.common.base.Preconditions"; - private static final String MORE_EXECUTORS_CLASS_NAME = "com.google.common.util.concurrent.MoreExecutors"; - private static final Type OBJECT_TYPE = Type.getType(Object.class); - private static final Type STRING_TYPE = Type.getType(String.class); - private static final Type RUNNABLE_TYPE = Type.getType(Runnable.class); - - /** - * Returns {@code true} if the given class needs to be rewritten. - */ - public boolean needRewrite(String className) { - return PRECONDTIONS_CLASS_NAME.equals(className) || MORE_EXECUTORS_CLASS_NAME.equals(className); - } - - @Nullable - @Override - public byte[] rewriteClass(String className, InputStream input) throws IOException { - if (PRECONDTIONS_CLASS_NAME.equals(className)) { - return rewritePreconditions(input); - } - if (MORE_EXECUTORS_CLASS_NAME.equals(className)) { - return rewriteMoreExecutors(input); - } - return null; - } - - /** - * Rewrites the {@link Preconditions} class to add various {@code checkArgument}, {@code - * checkState}, and {@code checkNotNull} methods that are missing in earlier Guava library. - * - * @param input the bytecode stream of the Preconditions class - * @return the rewritten bytecode - * @throws IOException if failed to rewrite - */ - private byte[] rewritePreconditions(InputStream input) throws IOException { - Type[] types = new Type[]{ - OBJECT_TYPE, - Type.CHAR_TYPE, - Type.INT_TYPE, - Type.LONG_TYPE - }; - - // Generates all the methods that we need to add to the Preconditions class - // There are multiple of them, each take one or combinations of two parameters of type Object, char, int, and long - // for the string formatting template to use - List methods = new ArrayList<>(); - // Carry the list of method templates for generating new methods. - // Each template contains the method name, the first argument type, and the return type. - List methodTemplates = Arrays.asList( - new Method("checkArgument", Type.VOID_TYPE, new Type[]{Type.BOOLEAN_TYPE}), - new Method("checkState", Type.VOID_TYPE, new Type[]{Type.BOOLEAN_TYPE}), - new Method("checkNotNull", Type.getType(Object.class), - new Type[]{Type.getType(Object.class)}) - ); - - for (Method method : methodTemplates) { - String methodName = method.getName(); - Type returnType = method.getReturnType(); - Type firstArgType = method.getArgumentTypes()[0]; - - for (Type type : types) { - methods.add(new Method(methodName, returnType, - new Type[]{firstArgType, STRING_TYPE, type})); - for (Type type2 : types) { - methods.add(new Method(methodName, returnType, - new Type[]{firstArgType, STRING_TYPE, type, type2})); - } - } - - // Later version of Preconditions class also added methods that take three and four Objects - methods.add(new Method(methodName, returnType, - new Type[]{firstArgType, STRING_TYPE, OBJECT_TYPE, OBJECT_TYPE, OBJECT_TYPE})); - methods.add(new Method(methodName, returnType, - new Type[]{firstArgType, STRING_TYPE, - OBJECT_TYPE, OBJECT_TYPE, OBJECT_TYPE, OBJECT_TYPE})); - } - - ClassReader cr = new ClassReader(input); - ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES); - cr.accept(new PreconditionsRewriter(Opcodes.ASM7, cw, methods), ClassReader.EXPAND_FRAMES); - return cw.toByteArray(); - } - - /** - * Rewrites the {@link MoreExecutors} class to add the {@code directExecutor} method. - * - * @param input the bytecode stream of the MoreExecutors class - * @return the rewritten bytecode - * @throws IOException if failed to rewrite - */ - private byte[] rewriteMoreExecutors(InputStream input) throws IOException { - ClassReader cr = new ClassReader(input); - ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES); - - Method method = new Method("directExecutor", Type.getType(Executor.class), new Type[0]); - cr.accept(new ClassVisitor(Opcodes.ASM7, cw) { - - private boolean hasMethod; - - @Override - public void visit(int version, int access, String name, String signature, String superName, - String[] interfaces) { - // Rewrite the class to 1.7 format so that we can use lambda to implement the directExecutor() method - super.visit(Opcodes.V1_7, access, name, signature, superName, interfaces); - } - - @Override - public MethodVisitor visitMethod(int access, String name, String descriptor, - String signature, String[] exceptions) { - if (method.equals(new Method(name, descriptor)) - && (access & Opcodes.ACC_PUBLIC) == Opcodes.ACC_PUBLIC - && (access & Opcodes.ACC_STATIC) == Opcodes.ACC_STATIC) { - hasMethod = true; - } - return super.visitMethod(access, name, descriptor, signature, exceptions); - } - - @Override - public void visitEnd() { - if (hasMethod) { - super.visitEnd(); - return; - } - - // Generate the method - // public static Executor directExecutor() { - // return Runnable::run; - // } - MethodVisitor mv = super.visitMethod(Opcodes.ACC_PUBLIC | Opcodes.ACC_STATIC, - method.getName(), method.getDescriptor(), null, null); - GeneratorAdapter adapter = new GeneratorAdapter(mv, Opcodes.ACC_PUBLIC | Opcodes.ACC_STATIC, - method.getName(), method.getDescriptor()); - // Perform the lambda invocation. - Handle metaFactoryHandle = new Handle(Opcodes.H_INVOKESTATIC, - Type.getType(LambdaMetafactory.class).getInternalName(), - "metafactory", Methods.LAMBDA_META_FACTORY_METHOD_DESC, false); - Handle lambdaMethodHandle = new Handle(Opcodes.H_INVOKEINTERFACE, - RUNNABLE_TYPE.getInternalName(), - "run", Type.getMethodDescriptor(Type.VOID_TYPE), true); - - // Signature of the Executor.execute(Runnable) - Type samMethodType = Type.getType(Type.getMethodDescriptor(Type.VOID_TYPE, RUNNABLE_TYPE)); - adapter.invokeDynamic("execute", Type.getMethodDescriptor(Type.getType(Executor.class)), - metaFactoryHandle, samMethodType, lambdaMethodHandle, samMethodType); - adapter.returnValue(); - adapter.endMethod(); - super.visitEnd(); - } - }, ClassReader.EXPAND_FRAMES); - - return cw.toByteArray(); - } - - /** - * A {@link ClassVisitor} to add a set of missing methods to the {@link Preconditions} class. - */ - private static final class PreconditionsRewriter extends ClassVisitor { - - private final Set methods; - - PreconditionsRewriter(int api, ClassVisitor classVisitor, Collection methods) { - super(api, classVisitor); - this.methods = new LinkedHashSet<>(methods); - } - - @Override - public MethodVisitor visitMethod(int access, String name, String descriptor, - String signature, String[] exceptions) { - Method method = new Method(name, descriptor); - if (methods.contains(method) - && (access & Opcodes.ACC_PUBLIC) == Opcodes.ACC_PUBLIC - && (access & Opcodes.ACC_STATIC) == Opcodes.ACC_STATIC) { - methods.remove(method); - } - return super.visitMethod(access, name, descriptor, signature, exceptions); - } - - @Override - public void visitEnd() { - for (Method method : methods) { - LOG.trace("{}.{} not found. Rewriting class to inject the missing method", - PRECONDTIONS_CLASS_NAME, method); - - // Generate the missing method that calls the version with varargs - // For example, - // - // public static void checkArgument(boolean expression, String template, Object arg) { - // checkArgument(expression, template, new Object[] { arg }); - // } - MethodVisitor mv = super.visitMethod(Opcodes.ACC_PUBLIC | Opcodes.ACC_STATIC, - method.getName(), method.getDescriptor(), null, null); - GeneratorAdapter adapter = new GeneratorAdapter(mv, Opcodes.ACC_PUBLIC | Opcodes.ACC_STATIC, - method.getName(), method.getDescriptor()); - adapter.loadArg(0); - adapter.loadArg(1); - - // New an array of size based on the number of parameters - int objectArray = adapter.newLocal(Type.getType(Object[].class)); - - Type[] argTypes = method.getArgumentTypes(); - adapter.push(argTypes.length - 2); - adapter.newArray(OBJECT_TYPE); - adapter.storeLocal(objectArray); - - // Put the arguments into the Object array - for (int i = 0; i < argTypes.length - 2; i++) { - Type argType = argTypes[i + 2]; - adapter.loadLocal(objectArray); - adapter.push(i); - adapter.loadArg(i + 2); - // If the given argument is not an Object, turn it into a String - if (argType.getSort() != Type.OBJECT) { - adapter.invokeStatic(STRING_TYPE, - new Method("valueOf", STRING_TYPE, new Type[]{argType})); - } - adapter.arrayStore(OBJECT_TYPE); - } - adapter.loadLocal(objectArray); - - // Call the varargs method - adapter.invokeStatic(Type.getObjectType(PRECONDTIONS_CLASS_NAME.replace('.', '/')), - new Method(method.getName(), method.getReturnType(), - new Type[]{ - method.getArgumentTypes()[0], - method.getArgumentTypes()[1], - Type.getType(Object[].class) - })); - adapter.returnValue(); - adapter.endMethod(); - } - - super.visitEnd(); - } - } -} diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/lang/InstantiatorFactory.java b/cdap-common/src/main/java/io/cdap/cdap/common/lang/InstantiatorFactory.java index 3616bc6fba22..5c42c215166d 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/lang/InstantiatorFactory.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/lang/InstantiatorFactory.java @@ -98,7 +98,8 @@ public T create() { try { return (T) defaultCons.newInstance(); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } }; @@ -184,7 +185,8 @@ public T create() { } return (T) instance; } catch (InstantiationException | IllegalAccessException e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } }; diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/lang/PropertyFieldSetter.java b/cdap-common/src/main/java/io/cdap/cdap/common/lang/PropertyFieldSetter.java index fa1c3863b9bc..fb0aa5cefe48 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/lang/PropertyFieldSetter.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/lang/PropertyFieldSetter.java @@ -79,10 +79,12 @@ private void setValue(Object instance, Field field, String value) throws Illegal field.set(instance, fieldType.getMethod("valueOf", String.class).invoke(null, value)); } catch (NoSuchMethodException e) { // Should never happen, as boxed type always have the valueOf(String) method. - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } catch (InvocationTargetException e) { // Also should never happen, as calling method on Java bootstrap classes should always succeed. - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/logging/AbstractLoggingContext.java b/cdap-common/src/main/java/io/cdap/cdap/common/logging/AbstractLoggingContext.java index 11f95f46aaa0..fe88d0b2d816 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/logging/AbstractLoggingContext.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/logging/AbstractLoggingContext.java @@ -16,7 +16,7 @@ package io.cdap.cdap.common.logging; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; import com.google.common.collect.Maps; import java.lang.reflect.Method; import java.util.Collection; @@ -113,7 +113,7 @@ public Map getSystemTagsAsString() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("systemTags", systemTags) .toString(); } @@ -140,7 +140,7 @@ public String getValue() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("name", name) .add("value", value) .toString(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/resource/ResourceBalancerService.java b/cdap-common/src/main/java/io/cdap/cdap/common/resource/ResourceBalancerService.java index 42283b6e1fcf..6941c3de4120 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/resource/ResourceBalancerService.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/resource/ResourceBalancerService.java @@ -94,13 +94,13 @@ public void leader() { coordinator = new ResourceCoordinator(zk, discoveryServiceClient, new BalancedAssignmentStrategy()); - coordinator.startAndWait(); + coordinator.startAsync().awaitRunning(); } @Override public void follower() { if (coordinator != null) { - coordinator.stopAndWait(); + coordinator.stopAsync().awaitTerminated(); coordinator = null; } } @@ -130,8 +130,8 @@ protected void startUp() throws Exception { Discoverable discoverable = createDiscoverable(serviceName); cancelDiscoverable = discoveryService.register(ResolvingDiscoverable.of(discoverable)); - election.start(); - resourceClient.startAndWait(); + election.startAsync(); + resourceClient.startAsync().awaitRunning(); cancelResourceHandler = resourceClient.subscribe(serviceName, createResourceHandler(discoverable)); @@ -162,7 +162,8 @@ protected void shutDown() throws Exception { LOG.error("Exception while shutting down{}.", serviceName, th); } if (throwable != null) { - throw Throwables.propagate(throwable); + Throwables.throwIfUnchecked(throwable); + throw new RuntimeException(throwable); } LOG.info("Stopped ResourceBalancer {} service.", serviceName); } @@ -181,18 +182,18 @@ public void onChange(Collection partitionReplicas) { LOG.info("Partitions changed {}, service: {}", partitions, serviceName); try { if (service != null) { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } if (partitions.isEmpty() || !election.isRunning()) { service = null; } else { service = createService(partitions); - service.startAndWait(); + service.startAsync().awaitRunning(); } } catch (Throwable t) { LOG.error("Failed to change partitions, service: {}.", serviceName, t); completion.setException(t); - stop(); + stopAsync(); } } @@ -200,7 +201,7 @@ public void onChange(Collection partitionReplicas) { public void finished(Throwable failureCause) { try { if (service != null) { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); service = null; } completion.set(null); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/security/YarnTokenUtils.java b/cdap-common/src/main/java/io/cdap/cdap/common/security/YarnTokenUtils.java index f0cb950723f6..c92fadf93e88 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/security/YarnTokenUtils.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/security/YarnTokenUtils.java @@ -97,7 +97,8 @@ public static Credentials obtainToken(YarnConfiguration configuration, Credentia return credentials; } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/service/AbstractRetryableScheduledService.java b/cdap-common/src/main/java/io/cdap/cdap/common/service/AbstractRetryableScheduledService.java index 47a52840859f..0c65b55a7608 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/service/AbstractRetryableScheduledService.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/service/AbstractRetryableScheduledService.java @@ -17,6 +17,7 @@ package io.cdap.cdap.common.service; import com.google.common.util.concurrent.AbstractScheduledService; +import com.google.common.util.concurrent.Service; import io.cdap.cdap.api.retry.RetriesExhaustedException; import io.cdap.cdap.common.logging.LogSamplers; import io.cdap.cdap.common.logging.Loggers; @@ -24,7 +25,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.twill.common.Threads; -import org.apache.twill.internal.ServiceListenerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,10 +56,22 @@ public abstract class AbstractRetryableScheduledService extends AbstractSchedule */ protected AbstractRetryableScheduledService(RetryStrategy retryStrategy) { this.retryStrategy = retryStrategy; - addListener(new ServiceListenerAdapter() { + addListener(new Service.Listener() { + @Override + public void starting() {} + + @Override + public void running() {} + + @Override + public void stopping(State from) {} + + @Override + public void terminated(State from) {} + @Override public void failed(State from, Throwable failure) { - LOG.error("Scheduled service {} terminated due to failure", getServiceName(), failure); + LOG.error("Scheduled service {} terminated due to failure", serviceName(), failure); } }, Threads.SAME_THREAD_EXECUTOR); } @@ -99,7 +111,7 @@ protected boolean shouldRetry(Exception ex) { * @throws Exception if startup of this service failed */ protected void doStartUp() throws Exception { - LOG.debug("Starting scheduled service {}", getServiceName()); + LOG.debug("Starting scheduled service {}", serviceName()); } /** @@ -109,20 +121,20 @@ protected void doStartUp() throws Exception { * @throws Exception if shutdown of this service failed */ protected void doShutdown() throws Exception { - LOG.debug("Stopping scheduled service {}", getServiceName()); + LOG.debug("Stopping scheduled service {}", serviceName()); } /** * Returns the name of this service. */ - protected String getServiceName() { + protected String serviceName() { return getClass().getSimpleName(); } @Override protected ScheduledExecutorService executor() { executor = Executors.newSingleThreadScheduledExecutor( - Threads.createDaemonThreadFactory(getServiceName())); + Threads.createDaemonThreadFactory(serviceName())); return executor; } @@ -191,7 +203,7 @@ protected final void runOneIteration() throws Exception { this.delayMillis = delayMillis; } } catch (Throwable t) { - LOG.error("Aborting service {} due to non-retryable error", getServiceName(), t); + LOG.error("Aborting service {} due to non-retryable error", serviceName(), t); throw t; } } @@ -210,6 +222,6 @@ protected Schedule getNextSchedule() { * Logs an exception raised by {@link #runTask()}. */ protected void logTaskFailure(Throwable t) { - outageLog.warn("Failed to execute task for scheduled service {}", getServiceName(), t); + outageLog.warn("Failed to execute task for scheduled service {}", serviceName(), t); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/service/CommandPortService.java b/cdap-common/src/main/java/io/cdap/cdap/common/service/CommandPortService.java index 654cf870dad2..d66cb3789ea2 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/service/CommandPortService.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/service/CommandPortService.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; -import java.io.Writer; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; @@ -44,7 +43,7 @@ * CommandPortService service = CommandPortService.builder("myservice") * .addCommandHandler("ruok", "Are you okay?", ruokHandler) * .build(); - * service.startAndWait(); + * service.startAsync().awaitRunning(); * * * To stop the service, invoke {@link #stop()} or {@link #stopAndWait()}. @@ -109,7 +108,8 @@ protected void triggerShutdown() { serverSocket.close(); } } catch (IOException e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/service/RetryOnStartFailureService.java b/cdap-common/src/main/java/io/cdap/cdap/common/service/RetryOnStartFailureService.java index 55f616d5020e..aeee80b7fbe4 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/service/RetryOnStartFailureService.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/service/RetryOnStartFailureService.java @@ -18,8 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AbstractService; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.Uninterruptibles; import java.util.concurrent.TimeUnit; @@ -68,12 +67,10 @@ public void run() { while (!stopped) { try { - currentDelegate.start().get(); + currentDelegate.startAsync().awaitRunning(); // Only assigned the delegate if and only if the delegate service started successfully startedService = currentDelegate; break; - } catch (InterruptedException e) { - // This thread will be interrupted from the doStop() method. Don't reset the interrupt flag. } catch (Throwable t) { LOG.debug("Exception raised when starting service {}", delegateServiceName, t); @@ -112,25 +109,62 @@ protected void doStop() { // the setting of the startedService field. When that happens, the stop failure state is not propagated. // Nevertheless, there won't be any service left behind without stopping. if (startedService != null) { - Futures.addCallback(startedService.stop(), new FutureCallback() { + startedService.addListener(new Service.Listener() { @Override - public void onSuccess(State result) { + public void starting() {} + + @Override + public void running() {} + + @Override + public void stopping(State from) {} + + @Override + public void terminated(State from) { notifyStopped(); } @Override - public void onFailure(Throwable t) { - notifyFailed(t); + public void failed(State from, Throwable failure) { + notifyFailed(failure); } - }, Threads.SAME_THREAD_EXECUTOR); + }, MoreExecutors.directExecutor()); + startedService.stopAsync(); return; } - // If there is no started service, stop the current delete, but no need to propagate the stop state + // If there is no started service, stop the current delegate, but no need to propagate the stop state // because if the underlying service is not yet started due to failure, it shouldn't affect the stop state // of this retrying service. if (currentDelegate != null) { - currentDelegate.stop().addListener(this::notifyStopped, Threads.SAME_THREAD_EXECUTOR); + // If the delegate is already in a terminal state (FAILED or TERMINATED), stopAsync() won't + // trigger any listener callbacks, so we need to notify directly. + State delegateState = currentDelegate.state(); + if (delegateState == State.TERMINATED || delegateState == State.FAILED) { + notifyStopped(); + return; + } + currentDelegate.addListener(new Service.Listener() { + @Override + public void starting() {} + + @Override + public void running() {} + + @Override + public void stopping(State from) {} + + @Override + public void terminated(State from) { + notifyStopped(); + } + + @Override + public void failed(State from, Throwable failure) { + notifyStopped(); + } + }, Threads.SAME_THREAD_EXECUTOR); + currentDelegate.stopAsync(); return; } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/service/Services.java b/cdap-common/src/main/java/io/cdap/cdap/common/service/Services.java index 74d619454692..5d0dafed298c 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/service/Services.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/service/Services.java @@ -16,7 +16,6 @@ package io.cdap.cdap.common.service; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -51,9 +50,8 @@ private Services() { public static void startAndWait(Service service, long timeout, TimeUnit timeoutUnit, @Nullable String timeoutErrorMessage) throws TimeoutException, InterruptedException, ExecutionException { - ListenableFuture startFuture = service.start(); try { - startFuture.get(timeout, timeoutUnit); + service.startAsync().awaitRunning(timeout, timeoutUnit); } catch (TimeoutException e) { LOG.error(timeoutErrorMessage != null ? timeoutErrorMessage : "Timeout while waiting to start service.", e); @@ -62,19 +60,19 @@ public static void startAndWait(Service service, long timeout, TimeUnit timeoutU timeoutException.setStackTrace(e.getStackTrace()); } try { - service.stop(); + service.stopAsync(); } catch (Exception stopException) { LOG.error("Error while trying to stop service: ", stopException); } throw timeoutException; - } catch (InterruptedException e) { - LOG.error("Interrupted while waiting to start service.", e); + } catch (IllegalStateException e) { + LOG.error("Failed to start service.", e); try { - service.stop(); + service.stopAsync(); } catch (Exception stopException) { LOG.error("Error while trying to stop service:", stopException); } - throw e; + throw new ExecutionException(e); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractMasterTwillRunnable.java b/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractMasterTwillRunnable.java index d9fd6c75729c..95465b514f95 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractMasterTwillRunnable.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/twill/AbstractMasterTwillRunnable.java @@ -37,7 +37,6 @@ import org.apache.twill.api.TwillContext; import org.apache.twill.api.TwillRunnableSpecification; import org.apache.twill.common.Threads; -import org.apache.twill.internal.ServiceListenerAdapter; import org.apache.twill.internal.Services; import org.apache.twill.kafka.client.BrokerService; import org.apache.twill.kafka.client.KafkaClientService; @@ -110,7 +109,8 @@ public final void initialize(TwillContext context) { Preconditions.checkArgument(!services.isEmpty(), "Should have at least one service"); LOG.info("Runnable initialized {}", name); } catch (Throwable t) { - throw Throwables.propagate(t); + Throwables.throwIfUnchecked(t); + throw new RuntimeException(t); } } @@ -137,7 +137,8 @@ public void run() { } catch (InterruptedException e) { LOG.debug("Waiting on latch interrupted {}", name); } catch (ExecutionException e) { - throw Throwables.propagate(e.getCause()); + Throwables.throwIfUnchecked(e.getCause()); + throw new RuntimeException(e.getCause()); } } @@ -153,7 +154,22 @@ public void destroy() { private Service.Listener createServiceListener(final String name, final SettableFuture future) { - return new ServiceListenerAdapter() { + return new Service.Listener() { + @Override + public void starting() { + // no-op + } + + @Override + public void running() { + // no-op + } + + @Override + public void stopping(Service.State from) { + // no-op + } + @Override public void terminated(Service.State from) { LOG.info("Service " + name + " terminated"); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillController.java b/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillController.java index 3074529dcf19..38432ef02361 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillController.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/twill/NoopTwillController.java @@ -16,6 +16,7 @@ package io.cdap.cdap.common.twill; +import com.google.common.util.concurrent.Service; import java.util.Collections; import java.util.Iterator; import java.util.Map; @@ -23,6 +24,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import org.apache.twill.api.Command; import org.apache.twill.api.ResourceReport; @@ -134,7 +137,7 @@ public Future> updateLogLevels( @Override public Future> updateLogLevels(String runnableName, - Map logLevelsForRunnable) { + Map logLevelsForRunnable) { CompletableFuture> future = new CompletableFuture<>(); future.completeExceptionally( new UnsupportedOperationException("Update log levels is not supported")); @@ -157,6 +160,38 @@ public Future resetRunnableLogLevels(String runnableName, String... lo return future; } + @Override + public void awaitRunning() { + // no-op + } + + @Override + public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + // no-op + } + + @Override + public void awaitTerminated() { + // no-op + } + + @Override + public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + // no-op + } + + @Override + public Service startAsync() { + start(); + return this; + } + + @Override + public Service stopAsync() { + stop(); + return this; + } + @Override protected void startUp() { // no-op @@ -177,6 +212,12 @@ public Future sendCommand(String runnableName, Command command) { return CompletableFuture.completedFuture(command); } + @Nullable + @Override + public Throwable failureCause() { + return null; + } + @Override public void kill() { terminate(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/utils/BatchingConsumer.java b/cdap-common/src/main/java/io/cdap/cdap/common/utils/BatchingConsumer.java index cdda0dd18b21..18aae382f839 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/utils/BatchingConsumer.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/utils/BatchingConsumer.java @@ -79,7 +79,8 @@ public void close() { try { ((AutoCloseable) child).close(); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/utils/ImmutablePair.java b/cdap-common/src/main/java/io/cdap/cdap/common/utils/ImmutablePair.java index 6e3e9d69f9a8..7a8dea597912 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/utils/ImmutablePair.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/utils/ImmutablePair.java @@ -16,6 +16,7 @@ package io.cdap.cdap.common.utils; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; /** @@ -83,7 +84,7 @@ public B getSecond() { */ @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("first", first) .add("second", second) .toString(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/utils/TimeBoundIterator.java b/cdap-common/src/main/java/io/cdap/cdap/common/utils/TimeBoundIterator.java index 4598513fe11b..4839e454faaa 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/utils/TimeBoundIterator.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/utils/TimeBoundIterator.java @@ -17,10 +17,10 @@ package io.cdap.cdap.common.utils; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.AbstractIterator; import java.util.Iterator; +import java.util.concurrent.TimeUnit; /** * An iterator that will act as if there are no more elements if a certain amount of time has @@ -35,7 +35,7 @@ public class TimeBoundIterator extends AbstractIterator { private final Stopwatch stopwatch; public TimeBoundIterator(Iterator delegate, long timeBoundMillis) { - this(delegate, timeBoundMillis, new Stopwatch()); + this(delegate, timeBoundMillis, Stopwatch.createUnstarted()); } public TimeBoundIterator(Iterator delegate, long timeBoundMillis, Stopwatch stopwatch) { @@ -49,7 +49,7 @@ public TimeBoundIterator(Iterator delegate, long timeBoundMillis, Stopwatch s @Override protected T computeNext() { - if (stopwatch.elapsedMillis() < timeBoundMillis && delegate.hasNext()) { + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) < timeBoundMillis && delegate.hasNext()) { return delegate.next(); } return endOfData(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/ZKExtOperations.java b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/ZKExtOperations.java index b01bb6da0cc1..941355af6a48 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/ZKExtOperations.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/ZKExtOperations.java @@ -369,8 +369,7 @@ public void onFailure(Throwable t) { resultFuture.setException(t); } } - }, Threads.SAME_THREAD_EXECUTOR - ); + }, Threads.SAME_THREAD_EXECUTOR); return resultFuture; } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/PartitionReplica.java b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/PartitionReplica.java index f9aae9efccdb..37032f9058f1 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/PartitionReplica.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/PartitionReplica.java @@ -15,6 +15,7 @@ */ package io.cdap.cdap.common.zookeeper.coordination; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.primitives.Ints; import java.util.Comparator; @@ -82,7 +83,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("partition", name) .add("replica", replicaId) .toString(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinator.java b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinator.java index 9bcea77d9a40..4e96a1eaf10d 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinator.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinator.java @@ -412,8 +412,7 @@ public void onFailure(Throwable t) { LOG.error("Failed to save assignment {}", Bytes.toStringBinary(data), t); doNotifyFailed(t); } - }, executor - ); + }, executor); } catch (Exception e) { // Something very wrong LOG.error("Failed to save assignment: {}", assignmentName, e); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorClient.java b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorClient.java index 01aeb7996ad7..ca39477538f4 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorClient.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorClient.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ListenableFuture; import io.cdap.cdap.api.common.Bytes; import io.cdap.cdap.common.zookeeper.ZKExtOperations; @@ -66,7 +67,8 @@ public ResourceRequirement apply(@Nullable NodeData input) { } catch (Throwable t) { LOG.error("Failed to decode resource requirement: {}", Bytes.toStringBinary(input.getData()), t); - throw Throwables.propagate(t); + Throwables.throwIfUnchecked(t); + throw new RuntimeException(t); } } }; @@ -132,8 +134,8 @@ public ListenableFuture fetchRequirement(String resourceNam return Futures.transform( ZKOperations.ignoreError(zkClient.getData(zkPath), KeeperException.NoNodeException.class, null), - NODE_DATA_TO_REQUIREMENT - ); + NODE_DATA_TO_REQUIREMENT, + MoreExecutors.directExecutor()); } /** @@ -150,8 +152,8 @@ public ListenableFuture deleteRequirement(String resourceName) { return Futures.transform( ZKOperations.ignoreError(zkClient.delete(zkPath), KeeperException.NoNodeException.class, resourceName), - Functions.constant(resourceName) - ); + Functions.constant(resourceName), + MoreExecutors.directExecutor()); } /** diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceRequirement.java b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceRequirement.java index 15ad26371137..f01815938ce9 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceRequirement.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/coordination/ResourceRequirement.java @@ -15,6 +15,7 @@ */ package io.cdap.cdap.common.zookeeper.coordination; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSortedSet; @@ -60,7 +61,7 @@ public Set getPartitions() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("name", name) .add("partitions", partitions) .toString(); @@ -139,7 +140,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("name", name) .add("replicas", replicas) .toString(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoService.java b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoService.java index 15fa2d73582f..d64291c1687c 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoService.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoService.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -81,9 +82,9 @@ public LeaderElectionInfoService(ZKClient zkClient, String leaderElectionPath) { public SortedMap getParticipants(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { try { - Stopwatch stopwatch = new Stopwatch().start(); + Stopwatch stopwatch = Stopwatch.createStarted(); CountDownLatch readyLatch = readyFuture.get(timeout, unit); - long latchTimeout = Math.max(0, stopwatch.elapsedTime(unit) - timeout); + long latchTimeout = Math.max(0, stopwatch.elapsed(unit) - timeout); readyLatch.await(latchTimeout, unit); } catch (ExecutionException e) { // The ready future never throw on get. If this happen, just return an empty map @@ -237,7 +238,8 @@ public void onFailure(Throwable t) { readyLatch.countDown(); } } - }); + }, + MoreExecutors.directExecutor()); } /** diff --git a/cdap-common/src/main/java/io/cdap/cdap/data2/util/TableId.java b/cdap-common/src/main/java/io/cdap/cdap/data2/util/TableId.java index 35c5e51fc9ee..b6391bc80d16 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/data2/util/TableId.java +++ b/cdap-common/src/main/java/io/cdap/cdap/data2/util/TableId.java @@ -16,6 +16,7 @@ package io.cdap.cdap.data2.util; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -70,7 +71,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("namespace", namespace) .add("tableName", tableName) .toString(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/extension/AbstractExtensionLoader.java b/cdap-common/src/main/java/io/cdap/cdap/extension/AbstractExtensionLoader.java index e37d4229b634..16b7867d634c 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/extension/AbstractExtensionLoader.java +++ b/cdap-common/src/main/java/io/cdap/cdap/extension/AbstractExtensionLoader.java @@ -322,7 +322,8 @@ private ServiceLoader createServiceLoader(File dir) { return input.toURI().toURL(); } catch (MalformedURLException e) { // Shouldn't happen - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } }).toArray(URL[]::new); diff --git a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java index a2b5d7e945a9..0757b69e28ae 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java +++ b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java @@ -15,6 +15,7 @@ */ package io.cdap.cdap.internal.app.store; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.gson.Gson; import com.google.gson.annotations.SerializedName; @@ -26,7 +27,6 @@ import io.cdap.cdap.proto.RunRecord; import io.cdap.cdap.proto.id.ProfileId; import io.cdap.cdap.proto.id.ProgramRunId; -import io.cdap.cdap.runtime.spi.provisioner.Cluster; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -186,7 +186,7 @@ public int hashCode() { @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("programRunId", getProgramRunId()) .add("startTs", getStartTs()) .add("runTs", getRunTs()) diff --git a/cdap-common/src/main/java/io/cdap/cdap/internal/io/ASMDatumWriterFactory.java b/cdap-common/src/main/java/io/cdap/cdap/internal/io/ASMDatumWriterFactory.java index b715d217e44d..003c01035547 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/internal/io/ASMDatumWriterFactory.java +++ b/cdap-common/src/main/java/io/cdap/cdap/internal/io/ASMDatumWriterFactory.java @@ -63,7 +63,8 @@ public DatumWriter create(TypeToken type, Schema schema) { return (DatumWriter) writerClass.getConstructor(Schema.class, FieldAccessorFactory.class) .newInstance(schema, fieldAccessorFactory); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/internal/io/DatumWriterGenerator.java b/cdap-common/src/main/java/io/cdap/cdap/internal/io/DatumWriterGenerator.java index d1a43f3c1279..c8d721a12104 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/internal/io/DatumWriterGenerator.java +++ b/cdap-common/src/main/java/io/cdap/cdap/internal/io/DatumWriterGenerator.java @@ -798,7 +798,8 @@ private void encodeRecord(GeneratorAdapter mg, Schema schema, TypeToken outpu mg.invokeVirtual(classType, getEncodeMethod(fieldType, field.getSchema())); } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/internal/io/FieldAccessorGenerator.java b/cdap-common/src/main/java/io/cdap/cdap/internal/io/FieldAccessorGenerator.java index 49656c470ee7..5b3b47cd0c65 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/internal/io/FieldAccessorGenerator.java +++ b/cdap-common/src/main/java/io/cdap/cdap/internal/io/FieldAccessorGenerator.java @@ -117,7 +117,7 @@ private void initializeReflectionField(GeneratorAdapter mg, Field field) { this.field = field; } catch (Exception e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } */ Label beginTry = mg.newLabel(); @@ -202,7 +202,7 @@ private void invokeReflection(Method method, String signature) { * try { * // Call method * } catch (IllegalAccessException e) { - * throw Throwables.propagate(e); + * throw new RuntimeException(e); * } */ Label beginTry = mg.newLabel(); diff --git a/cdap-common/src/main/java/io/cdap/cdap/internal/io/ReflectionFieldAccessorFactory.java b/cdap-common/src/main/java/io/cdap/cdap/internal/io/ReflectionFieldAccessorFactory.java index 98ae5bef4638..b97967cc05ae 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/internal/io/ReflectionFieldAccessorFactory.java +++ b/cdap-common/src/main/java/io/cdap/cdap/internal/io/ReflectionFieldAccessorFactory.java @@ -62,7 +62,8 @@ public void set(Object object, T value) { try { finalField.set(object, value); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } @@ -72,7 +73,8 @@ public T get(Object object) { try { return (T) finalField.get(object); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/conf/CConfigurationTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/conf/CConfigurationTest.java index c45f9fe87886..a64806ad4f80 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/conf/CConfigurationTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/conf/CConfigurationTest.java @@ -16,7 +16,6 @@ package io.cdap.cdap.common.conf; -import com.google.common.io.Closeables; import io.cdap.cdap.api.common.Bytes; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -192,7 +191,11 @@ public void testDeprecatedConfigProperties() throws Exception { } // Close the InputStream - Closeables.closeQuietly(resource); + try { + resource.close(); + } catch (Exception ignored) { + // Ignored during cleanup + } } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/conf/ZKPropertyStoreTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/conf/ZKPropertyStoreTest.java index 63396168949b..e06e60466b4d 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/conf/ZKPropertyStoreTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/conf/ZKPropertyStoreTest.java @@ -39,16 +39,16 @@ public class ZKPropertyStoreTest extends PropertyStoreTestBase { @BeforeClass public static void init() throws IOException { zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); } @AfterClass public static void finish() { - zkClient.stopAndWait(); - zkServer.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); + zkServer.stopAsync().awaitTerminated(); } @Override diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/guice/KafkaClientModuleTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/guice/KafkaClientModuleTest.java index b41436c0b7bb..2288796dd1e2 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/guice/KafkaClientModuleTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/guice/KafkaClientModuleTest.java @@ -62,7 +62,7 @@ public class KafkaClientModuleTest { @Before public void beforeTest() throws Exception { zkServer = InMemoryZKServer.builder().setDataDir(TEMP_FOLDER.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); CConfiguration cConf = CConfiguration.create(); String kafkaZkNamespace = cConf.get(KafkaConstants.ConfigKeys.ZOOKEEPER_NAMESPACE_CONFIG); @@ -71,21 +71,21 @@ public void beforeTest() throws Exception { if (kafkaZkNamespace != null) { ZKClientService zkClient = new DefaultZKClientService(zkServer.getConnectionStr(), 2000, null, ImmutableMultimap.of()); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); zkClient.create("/" + kafkaZkNamespace, null, CreateMode.PERSISTENT); - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); kafkaZkConnect += "/" + kafkaZkNamespace; } kafkaServer = createKafkaServer(kafkaZkConnect, TEMP_FOLDER.newFolder()); - kafkaServer.startAndWait(); + kafkaServer.startAsync().awaitRunning(); } @After public void afterTest() { - kafkaServer.stopAndWait(); - zkServer.stopAndWait(); + kafkaServer.stopAsync().awaitTerminated(); + zkServer.stopAsync().awaitTerminated(); } @Test @@ -101,7 +101,7 @@ public void testWithSharedZkClient() throws Exception { // Get the shared zkclient and start it ZKClientService zkClientService = injector.getInstance(ZKClientService.class); - zkClientService.startAndWait(); + zkClientService.startAsync().awaitRunning(); final int baseZkConns = getZkConnections(); @@ -109,8 +109,8 @@ public void testWithSharedZkClient() throws Exception { final BrokerService brokerService = injector.getInstance(BrokerService.class); // Start both kafka and broker services, it shouldn't affect the state of the shared zk client - kafkaClientService.startAndWait(); - brokerService.startAndWait(); + kafkaClientService.startAsync().awaitRunning(); + brokerService.startAsync().awaitRunning(); // Shouldn't affect the shared zk client state Assert.assertTrue(zkClientService.isRunning()); @@ -127,8 +127,8 @@ public Boolean call() throws Exception { }, 5L, TimeUnit.SECONDS, 100, TimeUnit.MILLISECONDS); // Stop both, still shouldn't affect the state of the shared zk client - kafkaClientService.stopAndWait(); - brokerService.stopAndWait(); + kafkaClientService.stopAsync().awaitTerminated(); + brokerService.stopAsync().awaitTerminated(); // Still shouldn't affect the shared zk client Assert.assertTrue(zkClientService.isRunning()); @@ -136,7 +136,7 @@ public Boolean call() throws Exception { // It still shouldn't increase the number of zk client connections Assert.assertEquals(baseZkConns, getZkConnections()); - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } @Test @@ -154,7 +154,7 @@ public void testWithDedicatedZkClient() throws Exception { // Get the shared zkclient and start it ZKClientService zkClientService = injector.getInstance(ZKClientService.class); - zkClientService.startAndWait(); + zkClientService.startAsync().awaitRunning(); int baseZkConns = getZkConnections(); @@ -162,12 +162,12 @@ public void testWithDedicatedZkClient() throws Exception { final BrokerService brokerService = injector.getInstance(BrokerService.class); // Start the kafka client, it should increase the zk connections by 1 - kafkaClientService.startAndWait(); + kafkaClientService.startAsync().awaitRunning(); Assert.assertEquals(baseZkConns + 1, getZkConnections()); // Start the broker service, // it shouldn't affect the zk connections, as it share the same zk client with kafka client - brokerService.startAndWait(); + brokerService.startAsync().awaitRunning(); Assert.assertEquals(baseZkConns + 1, getZkConnections()); // Make sure it is talking to Kafka. @@ -182,17 +182,17 @@ public Boolean call() throws Exception { Assert.assertTrue(zkClientService.isRunning()); // Stop the broker service, it shouldn't affect the zk connections, as it is still used by the kafka client - brokerService.stopAndWait(); + brokerService.stopAsync().awaitTerminated(); Assert.assertEquals(baseZkConns + 1, getZkConnections()); // Stop the kafka client, the zk connections should be reduced by 1 - kafkaClientService.stopAndWait(); + kafkaClientService.stopAsync().awaitTerminated(); Assert.assertEquals(baseZkConns, getZkConnections()); // Still shouldn't affect the shared zk client Assert.assertTrue(zkClientService.isRunning()); - zkClientService.stopAndWait(); + zkClientService.stopAsync().awaitTerminated(); } /** diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/guice/ZkDiscoveryModuleTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/guice/ZkDiscoveryModuleTest.java index c809a29c2824..278b6f2ca178 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/guice/ZkDiscoveryModuleTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/guice/ZkDiscoveryModuleTest.java @@ -59,7 +59,7 @@ public class ZkDiscoveryModuleTest { @BeforeClass public static void init() throws IOException { zkServer = InMemoryZKServer.builder().setDataDir(TEMP_FOLDER.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); cConf = CConfiguration.create(); cConf.set(Constants.Zookeeper.QUORUM, zkServer.getConnectionStr()); @@ -68,7 +68,7 @@ public static void init() throws IOException { @AfterClass public static void finish() { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } @Test @@ -80,7 +80,7 @@ public void testMasterDiscovery() { ); ZKClientService zkClient = injector.getInstance(ZKClientService.class); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try { DiscoveryService discoveryService = injector.getInstance(DiscoveryService.class); DiscoveryServiceClient discoveryServiceClient = injector @@ -106,7 +106,7 @@ public void testMasterDiscovery() { } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @@ -119,7 +119,7 @@ public void testProgramDiscovery() { ); ZKClientService zkClient = injector.getInstance(ZKClientService.class); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try { // Register a service using the twill ZKClient. This is to simulate how a user Service program register ProgramId programId = NamespaceId.DEFAULT.app("app").service("service"); @@ -149,7 +149,7 @@ public void testProgramDiscovery() { } } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutorTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutorTest.java index 5781de705674..3993eca11ce6 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutorTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/internal/remote/RemoteTaskExecutorTest.java @@ -16,7 +16,8 @@ package io.cdap.cdap.common.internal.remote; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.Service; import io.cdap.cdap.api.metrics.MetricsCollectionService; import io.cdap.cdap.api.metrics.MetricsContext; import io.cdap.cdap.api.service.worker.RemoteExecutionException; @@ -39,6 +40,8 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.twill.common.Cancellable; import org.apache.twill.discovery.InMemoryDiscoveryService; import org.junit.After; @@ -90,7 +93,7 @@ public void modify(ChannelPipeline pipeline) { public void beforeTest() { metricCollectors = new HashMap<>(); mockMetricsCollector = createMockMetricsCollectionService(); - mockMetricsCollector.startAndWait(); + mockMetricsCollector.startAsync().awaitRunning(); registered = discoveryService.register(URIScheme.createDiscoverable(Constants.Service.TASK_WORKER, httpService)); } @@ -110,39 +113,69 @@ public byte[] decrypt(byte[] cipherData, byte[] associatedData) throws CipherExc private MetricsCollectionService createMockMetricsCollectionService() { return new MetricsCollectionService() { + private final AbstractService delegate = new AbstractService() { + @Override + protected void doStart() { + notifyStarted(); + } - @Override - public ListenableFuture start() { - return null; - } + @Override + protected void doStop() { + notifyStopped(); + } + }; @Override - public State startAndWait() { - return null; + public Service startAsync() { + delegate.startAsync(); + return this; } @Override public boolean isRunning() { - return false; + return delegate.isRunning(); } @Override public State state() { - return null; + return delegate.state(); } @Override - public ListenableFuture stop() { - return null; + public Service stopAsync() { + delegate.stopAsync(); + return this; } @Override - public State stopAndWait() { - return null; + public void awaitRunning() { + delegate.awaitRunning(); } @Override - public void addListener(final Listener listener, final Executor executor) {} + public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { + delegate.awaitRunning(timeout, unit); + } + + @Override + public void awaitTerminated() { + delegate.awaitTerminated(); + } + + @Override + public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { + delegate.awaitTerminated(timeout, unit); + } + + @Override + public Throwable failureCause() { + return delegate.failureCause(); + } + + @Override + public void addListener(final Listener listener, final Executor executor) { + delegate.addListener(listener, executor); + } @Override public MetricsContext getContext(Map context) { @@ -205,7 +238,7 @@ public void testFailedMetrics() throws Exception { // Exception thrown in the task executor should be in the exception message in the caller Assert.assertEquals("Invalid", e.getMessage()); } - mockMetricsCollector.stopAndWait(); + mockMetricsCollector.stopAsync().awaitTerminated(); Assert.assertSame(1, metricCollectors.size()); //check the metrics are present @@ -224,7 +257,7 @@ public void testSuccessMetrics() throws Exception { RunnableTaskRequest runnableTaskRequest = RunnableTaskRequest.getBuilder(ValidRunnableClass.class.getName()). withParam("param").withNamespace("testNamespace").build(); remoteTaskExecutor.runTask(runnableTaskRequest); - mockMetricsCollector.stopAndWait(); + mockMetricsCollector.stopAsync().awaitTerminated(); Assert.assertSame(1, metricCollectors.size()); //check the metrics are present @@ -249,7 +282,7 @@ public void testRetryMetrics() throws Exception { } catch (Exception e) { // expected } - mockMetricsCollector.stopAndWait(); + mockMetricsCollector.stopAsync().awaitTerminated(); Assert.assertSame(1, metricCollectors.size()); //check the metrics are present diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/resource/ResourceBalancerServiceTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/resource/ResourceBalancerServiceTest.java index 8a5d4b815491..46c13726714f 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/resource/ResourceBalancerServiceTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/resource/ResourceBalancerServiceTest.java @@ -59,12 +59,12 @@ public ResourceBalancerServiceTest() { @BeforeClass public static void init() throws IOException { zkServer = InMemoryZKServer.builder().setDataDir(TEMP_FOLDER.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); } @AfterClass public static void finish() { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } @Test @@ -72,13 +72,13 @@ public void testResourceBalancerService() throws Exception { // Simple test for resource balancer does react to discovery changes correct // More detailed tests are in ResourceCoordinatorTest, which the ResourceBalancerService depends on ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try (ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient)) { // Test the failure on stop case final TestBalancerService stopFailureService = new TestBalancerService("test", 4, zkClient, discoveryService, discoveryService, false, false); - stopFailureService.startAndWait(); + stopFailureService.startAsync().awaitRunning(); // Should get all four partitions Tasks.waitFor(ImmutableSet.of(0, 1, 2, 3), new Callable>() { @@ -103,20 +103,20 @@ public Integer call() throws Exception { cancellable.cancel(); } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @Test public void testServiceStartFailure() throws Exception { ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try (ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient)) { // Test the failure on start case final TestBalancerService startFailureService = new TestBalancerService("test", 4, zkClient, discoveryService, discoveryService, true, false); - startFailureService.startAndWait(); + startFailureService.startAsync().awaitRunning(); // The resource balance service should fail Tasks.waitFor(Service.State.FAILED, new Callable() { @@ -126,20 +126,20 @@ public Service.State call() throws Exception { } }, 10, TimeUnit.SECONDS, 100, TimeUnit.MILLISECONDS); } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @Test public void testServiceStopFailure() throws Exception { ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); try (ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClient)) { // Test the failure on stop case final TestBalancerService stopFailureService = new TestBalancerService("test", 4, zkClient, discoveryService, discoveryService, false, true); - stopFailureService.startAndWait(); + stopFailureService.startAsync().awaitRunning(); // Should get four partitions Tasks.waitFor(ImmutableSet.of(0, 1, 2, 3), new Callable>() { @@ -165,7 +165,7 @@ public Service.State call() throws Exception { cancellable.cancel(); } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/service/CommandPortServiceTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/service/CommandPortServiceTest.java index 6aee3250ddb2..ddbbf6bcea4b 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/service/CommandPortServiceTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/service/CommandPortServiceTest.java @@ -16,8 +16,6 @@ package io.cdap.cdap.common.service; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Service; import java.io.BufferedReader; import java.io.BufferedWriter; @@ -62,19 +60,18 @@ public void testCommandPortServer() throws Exception { .build(); final CountDownLatch stopLatch = new CountDownLatch(1); - Futures.addCallback(server.start(), new FutureCallback() { + server.addListener(new Service.Listener() { @Override - public void onSuccess(Service.State result) { + public void terminated(Service.State from) { stopLatch.countDown(); } @Override - public void onFailure(Throwable t) { + public void failed(Service.State from, Throwable failure) { stopLatch.countDown(); } - }); - // wait a bit for service to start - TimeUnit.SECONDS.sleep(1); + }, Runnable::run); + server.startAsync().awaitRunning(); try { for (int i = 0; i < 10; i++) { @@ -95,7 +92,7 @@ public void onFailure(Throwable t) { } } finally { - server.stopAndWait(); + server.stopAsync().awaitTerminated(); } Assert.assertEquals(10, handler.getCounter()); diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryOnStartFailureServiceTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryOnStartFailureServiceTest.java index b2233dc07704..360027d0aa41 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryOnStartFailureServiceTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryOnStartFailureServiceTest.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import org.apache.twill.common.Threads; -import org.apache.twill.internal.ServiceListenerAdapter; import org.junit.Assert; import org.junit.Test; @@ -42,7 +41,7 @@ public void testRetrySucceed() throws InterruptedException { Service service = new RetryOnStartFailureService( createServiceSupplier(3, startLatch, new CountDownLatch(1), false), RetryStrategies.fixDelay(10, TimeUnit.MILLISECONDS)); - service.startAndWait(); + service.startAsync().awaitRunning(); Assert.assertTrue(startLatch.await(1, TimeUnit.SECONDS)); } @@ -54,14 +53,14 @@ public void testRetryFail() throws InterruptedException { RetryStrategies.limit(10, RetryStrategies.fixDelay(10, TimeUnit.MILLISECONDS))); final CountDownLatch failureLatch = new CountDownLatch(1); - service.addListener(new ServiceListenerAdapter() { + service.addListener(new Service.Listener() { @Override public void failed(Service.State from, Throwable failure) { failureLatch.countDown(); } }, Threads.SAME_THREAD_EXECUTOR); - service.start(); + service.startAsync(); Assert.assertTrue(failureLatch.await(1, TimeUnit.SECONDS)); Assert.assertFalse(startLatch.await(100, TimeUnit.MILLISECONDS)); } @@ -73,9 +72,9 @@ public void testStopWhileRetrying() throws InterruptedException { Service service = new RetryOnStartFailureService( createServiceSupplier(1000, new CountDownLatch(1), failureLatch, false), RetryStrategies.fixDelay(10, TimeUnit.MILLISECONDS)); - service.startAndWait(); + service.startAsync().awaitRunning(); Assert.assertTrue(failureLatch.await(1, TimeUnit.SECONDS)); - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } @Test @@ -85,7 +84,7 @@ public void testStopFailurePropagate() throws InterruptedException, TimeoutExcep final RetryOnStartFailureService service = new RetryOnStartFailureService( createServiceSupplier(0, startLatch, new CountDownLatch(1), true), RetryStrategies.fixDelay(10, TimeUnit.MILLISECONDS)); - service.startAndWait(); + service.startAsync().awaitRunning(); // block until the underlying service started successfully Assert.assertTrue(startLatch.await(1, TimeUnit.SECONDS)); // As documented in the RetryOnStartFailureService, there is a small race after the @@ -99,7 +98,7 @@ public Boolean call() throws Exception { } }, 5, TimeUnit.SECONDS, 100, TimeUnit.MILLISECONDS); try { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); Assert.fail("Expected failure in stopping"); } catch (Exception e) { Assert.assertEquals("Intentional failure to shutdown", Throwables.getRootCause(e).getMessage()); diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryableScheduledServiceTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryableScheduledServiceTest.java index 2ce4ce226215..6fe1bb793b01 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryableScheduledServiceTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/service/RetryableScheduledServiceTest.java @@ -45,9 +45,9 @@ protected long runTask() { } }; - service.start(); + service.startAsync(); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } @Test @@ -59,12 +59,12 @@ protected long runTask() throws Exception { } }; - service.start(); + service.startAsync(); // Wait for the service to fail Tasks.waitFor(Service.State.FAILED, service::state, 5, TimeUnit.SECONDS, 10, TimeUnit.MILLISECONDS); try { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } catch (Exception e) { // The root cause should be the one throw from the runTask. It should suppressed the retry exhausted exception. Throwable rootCause = Throwables.getRootCause(e); @@ -89,12 +89,12 @@ protected boolean shouldRetry(Exception ex) { } }; - service.start(); + service.startAsync(); // Wait for the service to fail Tasks.waitFor(Service.State.FAILED, service::state, 5, TimeUnit.SECONDS, 10, TimeUnit.MILLISECONDS); try { - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } catch (Exception e) { // The root cause should be the one throw from the runTask. Throwable rootCause = Throwables.getRootCause(e); @@ -118,8 +118,8 @@ protected long runTask() throws Exception { return 1L; } }; - service.start(); + service.startAsync(); Assert.assertTrue(latch.await(3, TimeUnit.SECONDS)); - service.stopAndWait(); + service.stopAsync().awaitTerminated(); } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/ssh/SSHSessionTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/ssh/SSHSessionTest.java index 8fcf8a0014b8..137d24cf3843 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/ssh/SSHSessionTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/ssh/SSHSessionTest.java @@ -18,7 +18,6 @@ import com.google.common.base.Splitter; -import com.google.common.io.Closeables; import com.google.common.util.concurrent.AbstractExecutionThreadService; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; @@ -131,7 +130,7 @@ public void testLocalPortForwarding() throws Exception { // Starts an echo server for testing the port forwarding EchoServer echoServer = new EchoServer(); - echoServer.startAndWait(); + echoServer.startAsync().awaitRunning(); try { // Creates the DataConsumer for receiving data and validating the lifecycle StringBuilder received = new StringBuilder(); @@ -185,7 +184,7 @@ public void finished() { } } finally { - echoServer.stopAndWait(); + echoServer.stopAsync().awaitTerminated(); } } @@ -193,7 +192,7 @@ public void finished() { public void testForwardingOnSessionClose() throws Exception { EchoServer echoServer = new EchoServer(); - echoServer.startAndWait(); + echoServer.startAsync().awaitRunning(); try { SSHConfig sshConfig = getSSHConfig(); AtomicBoolean finished = new AtomicBoolean(false); @@ -236,7 +235,7 @@ public void finished() { } } finally { - echoServer.stopAndWait(); + echoServer.stopAsync().awaitTerminated(); } } @@ -244,7 +243,7 @@ public void finished() { public void testRemotePortForwarding() throws Exception { EchoServer echoServer = new EchoServer(); - echoServer.startAndWait(); + echoServer.startAsync().awaitRunning(); try { SSHConfig sshConfig = getSSHConfig(); @@ -264,7 +263,7 @@ public void testRemotePortForwarding() throws Exception { } } } finally { - echoServer.stopAndWait(); + echoServer.stopAsync().awaitTerminated(); } } @@ -320,7 +319,11 @@ protected void run() throws IOException { } catch (IOException e) { LOG.error("Exception raised from the EchoServer handling thread", e); } finally { - Closeables.closeQuietly(socket); + try { + socket.close(); + } catch (Exception ignored) { + // Ignored during cleanup + } } }); @@ -337,7 +340,11 @@ protected void run() throws IOException { @Override protected void triggerShutdown() { stopped = true; - Closeables.closeQuietly(serverSocket); + try { + serverSocket.close(); + } catch (Exception ignored) { + // Ignored during cleanup + } } } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/test/MockTwillContext.java b/cdap-common/src/test/java/io/cdap/cdap/common/test/MockTwillContext.java index 061ca4f0e2bd..b6fe043d3735 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/test/MockTwillContext.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/test/MockTwillContext.java @@ -56,7 +56,8 @@ public InetAddress getHost() { try { return InetAddress.getLocalHost(); } catch (UnknownHostException e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/utils/TimeBoundIteratorTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/utils/TimeBoundIteratorTest.java index 34758d971a5a..84635401c982 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/utils/TimeBoundIteratorTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/utils/TimeBoundIteratorTest.java @@ -34,7 +34,7 @@ public class TimeBoundIteratorTest { @Test public void testTimeBoundNotHit() { SettableTicker ticker = new SettableTicker(0); - Stopwatch stopwatch = new Stopwatch(ticker); + Stopwatch stopwatch = Stopwatch.createUnstarted(ticker); List list = new ArrayList<>(); list.add(0); @@ -54,7 +54,7 @@ public void testTimeBoundNotHit() { @Test public void testTimeBoundImmediatelyHit() { SettableTicker ticker = new SettableTicker(0); - Stopwatch stopwatch = new Stopwatch(ticker); + Stopwatch stopwatch = Stopwatch.createUnstarted(ticker); List list = new ArrayList<>(); list.add(0); @@ -70,7 +70,7 @@ public void testTimeBoundImmediatelyHit() { @Test public void testEarlyStop() { SettableTicker ticker = new SettableTicker(0); - Stopwatch stopwatch = new Stopwatch(ticker); + Stopwatch stopwatch = Stopwatch.createUnstarted(ticker); List list = new ArrayList<>(); list.add(0); diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/ZKExtOperationsTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/ZKExtOperationsTest.java index 4abd581060df..434508723f4c 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/ZKExtOperationsTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/ZKExtOperationsTest.java @@ -59,7 +59,7 @@ public Integer decode(byte[] data) throws IOException { @BeforeClass public static void init() throws IOException { zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); } @Test @@ -68,8 +68,8 @@ public void testGetAndSet() throws Exception { ZKClientService zkClient1 = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); ZKClientService zkClient2 = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient1.startAndWait(); - zkClient2.startAndWait(); + zkClient1.startAsync().awaitRunning(); + zkClient2.startAsync().awaitRunning(); // First a node would get created since no node there. ZKExtOperations.updateOrCreate(zkClient1, path, new Function() { @@ -108,7 +108,8 @@ public Integer apply(@Nullable Integer input) { return 3; } } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } throw new IllegalStateException("Illegal input " + input); } @@ -134,15 +135,15 @@ public Integer apply(@Nullable Integer input) { Assert.assertNull(result); - zkClient1.stopAndWait(); - zkClient2.stopAndWait(); + zkClient1.stopAsync().awaitTerminated(); + zkClient2.stopAsync().awaitTerminated(); } @Test public void testCreateOrSet() throws Exception { String path = "/parent/testCreateOrSet"; ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); // Create with "1" Assert.assertEquals(1, ZKExtOperations.createOrSet(zkClient, path, @@ -156,14 +157,14 @@ public void testCreateOrSet() throws Exception { // Should get "2" back Assert.assertEquals(2, INT_CODEC.decode(zkClient.getData(path).get().getData()).intValue()); - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } @Test public void testSetOrCreate() throws Exception { String path = "/parent/testSetOrCreate"; ZKClientService zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); // Create with "1" Assert.assertEquals(1, ZKExtOperations.setOrCreate(zkClient, path, @@ -177,11 +178,11 @@ public void testSetOrCreate() throws Exception { // Should get "2" back Assert.assertEquals(2, INT_CODEC.decode(zkClient.getData(path).get().getData()).intValue()); - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } @AfterClass public static void finish() { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } } diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorTest.java index b7813fb679bc..8cef4bf30471 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/coordination/ResourceCoordinatorTest.java @@ -71,18 +71,18 @@ public void testAssignment() throws InterruptedException, ExecutionException { new ZkClientModule(), new ZkDiscoveryModule()); ZKClientService zkClient = injector.getInstance(ZKClientService.class); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); DiscoveryService discoveryService = injector.getInstance(DiscoveryService.class); try { ResourceCoordinator coordinator = new ResourceCoordinator(zkClient, injector.getInstance(DiscoveryServiceClient.class), new BalancedAssignmentStrategy()); - coordinator.startAndWait(); + coordinator.startAsync().awaitRunning(); try { ResourceCoordinatorClient client = new ResourceCoordinatorClient(zkClient); - client.startAndWait(); + client.startAsync().awaitRunning(); try { // Create a requirement @@ -171,26 +171,26 @@ public void testAssignment() throws InterruptedException, ExecutionException { cancelDiscoverable2.cancel(); } finally { - client.stopAndWait(); + client.stopAsync().awaitTerminated(); } } finally { - coordinator.stopAndWait(); + coordinator.stopAsync().awaitTerminated(); } } finally { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } @BeforeClass public static void init() throws IOException { zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); } @AfterClass public static void finish() { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } private Cancellable subscribe(ResourceCoordinatorClient client, diff --git a/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoServiceTest.java b/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoServiceTest.java index 7ce62d05dbb2..10b5c4660aaf 100644 --- a/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoServiceTest.java +++ b/cdap-common/src/test/java/io/cdap/cdap/common/zookeeper/election/LeaderElectionInfoServiceTest.java @@ -55,12 +55,12 @@ public class LeaderElectionInfoServiceTest { @BeforeClass public static void init() throws IOException { zkServer = InMemoryZKServer.builder().setDataDir(TEMP_FOLDER.newFolder()).build(); - zkServer.startAndWait(); + zkServer.startAsync().awaitRunning(); } @AfterClass public static void finish() { - zkServer.stopAndWait(); + zkServer.stopAsync().awaitTerminated(); } @Test @@ -71,12 +71,12 @@ public void testParticipants() throws Exception { List zkClients = new ArrayList<>(); ZKClientService infoZKClient = DefaultZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - infoZKClient.startAndWait(); + infoZKClient.startAsync().awaitRunning(); zkClients.add(infoZKClient); // Start the LeaderElectionInfoService LeaderElectionInfoService infoService = new LeaderElectionInfoService(infoZKClient, prefix); - infoService.startAndWait(); + infoService.startAsync().awaitRunning(); // This will timeout as there is no leader election node created yet try { @@ -90,7 +90,7 @@ public void testParticipants() throws Exception { List leaderElections = new ArrayList<>(); for (int i = 0; i < size; i++) { ZKClientService zkClient = DefaultZKClientService.Builder.of(zkServer.getConnectionStr()).build(); - zkClient.startAndWait(); + zkClient.startAsync().awaitRunning(); zkClients.add(zkClient); final int participantId = i; @@ -105,7 +105,7 @@ public void follower() { LOG.info("Follow: {}", participantId); } }); - leaderElection.start(); + leaderElection.startAsync(); leaderElections.add(leaderElection); } @@ -136,7 +136,7 @@ public boolean apply(LeaderElectionInfoService.Participant input) { int expectedSize = size; for (LeaderElection leaderElection : leaderElections) { - leaderElection.stopAndWait(); + leaderElection.stopAsync().awaitTerminated(); Tasks.waitFor(--expectedSize, new Callable() { @Override public Integer call() throws Exception { @@ -150,10 +150,10 @@ public Integer call() throws Exception { Assert.assertTrue(snapshot.isEmpty()); Assert.assertEquals(participants, snapshot); - infoService.stopAndWait(); + infoService.stopAsync().awaitTerminated(); for (ZKClientService zkClient : zkClients) { - zkClient.stopAndWait(); + zkClient.stopAsync().awaitTerminated(); } } } diff --git a/pom.xml b/pom.xml index 64c19fa20c92..437dfec1cac3 100644 --- a/pom.xml +++ b/pom.xml @@ -108,7 +108,7 @@ 1.9.40 1.11.4 1.70 - 0.13.1 + 0.15.0-SNAPSHOT 1.4.0 1.2 3.2.2 @@ -123,7 +123,7 @@ 2.0.0 0.1.0 2.3.1 - 13.0.1 + 32.0.0-jre 4.0 3.3.6 2.2.4 @@ -148,7 +148,7 @@ 3.0.8.Final 2.0 - 2.12.15 + 2.12.18 3.1.0 1.7.15 1.1.1.7 @@ -158,7 +158,7 @@ 0.15.0-incubating 0.8.4 0.9.3 - 1.4.0 + 1.5.0-SNAPSHOT 2.3.6 3.4.5 1.3.1 @@ -302,6 +302,11 @@ guava ${guava.version} + + com.google.guava + failureaccess + 1.0.2 + com.google.inject guice