Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"java.compile.nullAnalysis.mode": "disabled"
}
38 changes: 38 additions & 0 deletions cdap-common/src/main/java/com/google/common/io/InputSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright © 2026 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.common.io;

import java.io.IOException;

/**
* Stub for the removed {@code com.google.common.io.InputSupplier} interface.
* This interface was removed in Guava 21 but is still referenced by the
* {@code common-http} library. This stub allows compilation to succeed.
*
* @param <T> the type of input object supplied
*/
@FunctionalInterface
public interface InputSupplier<T> {

/**
* Returns an input object that can be used for reading.
*
* @return the input object
* @throws IOException if an I/O error occurs
*/
T getInput() throws IOException;
}
38 changes: 38 additions & 0 deletions cdap-common/src/main/java/com/google/common/io/OutputSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright © 2026 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.common.io;

import java.io.IOException;

/**
* Stub for the removed {@code com.google.common.io.OutputSupplier} interface.
* This interface was removed in Guava 21 but is still referenced by the
* {@code twill} library. This stub allows compilation to succeed.
*
* @param <T> the type of output object supplied
*/
@FunctionalInterface
public interface OutputSupplier<T> {

/**
* Returns an output object that can be used for writing.
*
* @return the output object
* @throws IOException if an I/O error occurs
*/
T getOutput() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.cdap.cdap.common;

import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import io.cdap.cdap.api.common.HttpErrorStatusProvider;
import io.cdap.cdap.api.service.ServiceUnavailableException;
Expand Down Expand Up @@ -76,14 +76,14 @@ public void handle(Throwable t, HttpRequest request, HttpResponder responder) {

// If it is not some known exception type, response with 500.
LOG.error("Unexpected error: request={} {} user={}:", request.method().name(), request.getUri(),
Objects.firstNonNull(SecurityRequestContext.getUserId(), "<null>"), t);
MoreObjects.firstNonNull(SecurityRequestContext.getUserId(), "<null>"), t);
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR,
Throwables.getRootCause(t).getMessage());
}

private void logWithTrace(HttpRequest request, Throwable t) {
LOG.trace("Error in handling request={} {} for user={}:", request.method().name(),
request.getUri(),
Objects.firstNonNull(SecurityRequestContext.getUserId(), "<null>"), t);
MoreObjects.firstNonNull(SecurityRequestContext.getUserId(), "<null>"), t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,20 @@

package io.cdap.cdap.common.app;

import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import io.cdap.cdap.api.dataset.Dataset;
import io.cdap.cdap.common.dataset.DatasetClassRewriter;
import io.cdap.cdap.common.lang.ClassLoaders;
import io.cdap.cdap.common.lang.ClassPathResources;
import io.cdap.cdap.common.lang.CombineClassLoader;
import io.cdap.cdap.common.lang.FilterClassLoader;
import io.cdap.cdap.common.lang.GuavaClassRewriter;
import io.cdap.cdap.common.lang.InterceptableClassLoader;
import io.cdap.cdap.common.leveldb.LevelDBClassRewriter;
import io.cdap.cdap.common.security.AuthEnforceRewriter;
import io.cdap.cdap.common.utils.DirUtils;
import io.cdap.cdap.internal.asm.Classes;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
Expand All @@ -53,7 +48,6 @@
public class MainClassLoader extends InterceptableClassLoader {

private static final String DATASET_CLASS_NAME = Dataset.class.getName();
private final GuavaClassRewriter guavaClassRewriter;
private final DatasetClassRewriter datasetRewriter;
private final AuthEnforceRewriter authEnforceRewriter;
private final LevelDBClassRewriter levelDBClassRewriter;
Expand Down Expand Up @@ -137,7 +131,6 @@ public static MainClassLoader createFromContext(FilterClassLoader.Filter filter,
*/
public MainClassLoader(URL[] urls, ClassLoader parent) {
super(urls, parent);
this.guavaClassRewriter = new GuavaClassRewriter();
this.datasetRewriter = new DatasetClassRewriter();
this.authEnforceRewriter = new AuthEnforceRewriter();
this.levelDBClassRewriter = new LevelDBClassRewriter();
Expand All @@ -150,7 +143,8 @@ protected boolean needIntercept(String className) {
try {
return isRewriteNeeded(className);
} catch (IOException e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
Comment thread
AbhishekKumar9984 marked this conversation as resolved.
}
}

Expand All @@ -159,10 +153,6 @@ protected boolean needIntercept(String className) {
public byte[] rewriteClass(String className, InputStream input) throws IOException {
byte[] rewrittenCode = null;

if (guavaClassRewriter.needRewrite(className)) {
rewrittenCode = guavaClassRewriter.rewriteClass(className, input);
}
Comment thread
AbhishekKumar9984 marked this conversation as resolved.

if (isDatasetRewriteNeeded(className)) {
rewrittenCode = datasetRewriter.rewriteClass(className, input);
}
Expand All @@ -179,8 +169,7 @@ public byte[] rewriteClass(String className, InputStream input) throws IOExcepti
}

private boolean isRewriteNeeded(String className) throws IOException {
return guavaClassRewriter.needRewrite(className)
|| levelDBClassRewriter.needRewrite(className)
return levelDBClassRewriter.needRewrite(className)
|| isDatasetRewriteNeeded(className)
|| isAuthRewriteNeeded(className);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ private UserGroupInformation createUGI() {
return UserGroupInformation.createRemoteUser(hdfsUser);
}
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package io.cdap.cdap.common.guice;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import com.google.inject.Injector;
Expand All @@ -32,6 +30,7 @@
import io.cdap.cdap.common.conf.KafkaConstants;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.twill.common.Cancellable;
import org.apache.twill.internal.kafka.client.ZKBrokerService;
Expand Down Expand Up @@ -129,23 +128,90 @@ public ZKClientService get() {
// The logic doesn't need to be sophisticated since it is a private binding and only used by the
// wrapping KafkaClientService and BrokerService, which they will make sure no duplicate calls will be
// made to the start/stop methods.
return new ForwardingZKClientService(zkClientService) {
@Override
public ListenableFuture<State> start() {
if (startedCount.getAndIncrement() == 0) {
return super.start();
}
return Futures.immediateFuture(State.RUNNING);
}
final ZKClientService delegate = zkClientService;
return new RefCountZKClientService(delegate, startedCount);
}
}
Comment thread
AbhishekKumar9984 marked this conversation as resolved.

@Override
public ListenableFuture<State> stop() {
if (startedCount.decrementAndGet() == 0) {
return super.stop();
}
return Futures.immediateFuture(State.TERMINATED);
}
};
/**
* A {@link ZKClientService} wrapper using simple reference counting for start/stop.
* This wrapper is necessary because the underlying {@link ZKClientService} instance is shared between
* {@link DefaultKafkaClientService} and {@link DefaultBrokerService}. Both services manage its lifecycle;
* reference counting ensures the ZK client is only started once and remains open until both services are stopped.
*/
private static final class RefCountZKClientService extends ForwardingZKClientService {

private final ZKClientService delegate;
private final AtomicInteger startedCount;

RefCountZKClientService(ZKClientService delegate, AtomicInteger startedCount) {
super(delegate);
this.delegate = delegate;
this.startedCount = startedCount;
}

@Override
public Service startAsync() {
if (startedCount.getAndIncrement() == 0) {
delegate.startAsync();
}
return this;
}

@Override
public Service stopAsync() {
if (startedCount.decrementAndGet() == 0) {
delegate.stopAsync();
}
return this;
}

@Override
public Throwable failureCause() {
return delegate.failureCause();
}

@Override
public void awaitRunning() {
delegate.awaitRunning();
}

@Override
public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
delegate.awaitRunning(timeout, unit);
}

@Override
public void awaitTerminated() {
// If the delegate was not actually stopped (ref count > 0), return immediately
if (startedCount.get() > 0) {
return;
}
delegate.awaitTerminated();
}

@Override
public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
// If the delegate was not actually stopped (ref count > 0), return immediately
if (startedCount.get() > 0) {
return;
}
delegate.awaitTerminated(timeout, unit);
}

@Override
public State state() {
return delegate.state();
}

@Override
public boolean isRunning() {
return delegate.isRunning();
}

@Override
public void addListener(Listener listener, Executor executor) {
delegate.addListener(listener, executor);
}
}

Expand All @@ -166,12 +232,12 @@ private abstract static class AbstractServiceWithZkClient<T extends Service> ext

@Override
protected final void startUp() throws Exception {
zkClientService.startAndWait();
zkClientService.startAsync().awaitRunning();
try {
delegate.startAndWait();
delegate.startAsync().awaitRunning();
} catch (Exception e) {
try {
zkClientService.stopAndWait();
zkClientService.stopAsync().awaitTerminated();
} catch (Exception se) {
e.addSuppressed(se);
}
Expand All @@ -182,16 +248,16 @@ protected final void startUp() throws Exception {
@Override
protected final void shutDown() throws Exception {
try {
delegate.stopAndWait();
delegate.stopAsync().awaitTerminated();
} catch (Exception e) {
try {
zkClientService.stopAndWait();
zkClientService.stopAsync().awaitTerminated();
} catch (Exception se) {
e.addSuppressed(se);
}
throw e;
}
zkClientService.stopAndWait();
zkClientService.stopAsync().awaitTerminated();
}

protected T getDelegate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.cdap.cdap.common.http;

import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import io.cdap.http.BodyConsumer;
import io.cdap.http.HttpResponder;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -51,7 +50,8 @@ public void chunk(ByteBuf request, HttpResponder responder) {
}
request.readBytes(output, request.readableBytes());
} catch (IOException e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}

Expand All @@ -63,7 +63,8 @@ public final void finished(HttpResponder responder) {
}
onFinish(responder, file);
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
cleanup();
}
Expand All @@ -74,7 +75,11 @@ public final void handleError(Throwable cause) {
try {
LOG.error("Failed to handle upload", cause);
if (output != null) {
Closeables.closeQuietly(output);
try {
output.close();
} catch (Exception ignored) {
// Ignored, as we are already in error handling mode and want to continue cleanup
}
}
onError(cause);
// The netty-http framework will response with 500, no need to response in here.
Expand Down
Loading