Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1OwnerReference;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceBuilder;
import io.kubernetes.client.openapi.models.V1ServiceList;
Expand All @@ -34,10 +35,10 @@
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -78,6 +79,7 @@
private static final String SERVICE_TYPE_CLUSTER_IP = "ClusterIP";
private static final String PAYLOAD_NAME = "cdap.service.payload";

private final String podName;
private final String namespace;
private final String namePrefix;
private final Map<String, DefaultServiceDiscovered> serviceDiscovereds;
Expand All @@ -97,26 +99,24 @@
private final Map<String, String> loadBalancerServiceAnnotations;

/**
* Constructor to create an instance for service discovery on the given
* Kubernetes namespace.
* Constructor to create an instance for service discovery.
*
* @param namespace the Kubernetes namespace to perform service discovery on
* @param namePrefix prefix applies to all service names in k8s
* @param namespace the Kubernetes namespace to perform service discovery on
* @param namePrefix prefix applies to all service names in k8s
* @param podName name of the current pod
* @param podLabels labels of the current pod
* @param ownerReferences owner references to set on created services
* @param apiClientFactory factory to create Kubernetes API clients
* @param loadBalancerServiceList list of services that should be exposed via LoadBalancer
* @param loadBalancerServiceAnnotations annotations to apply to LoadBalancer services
*/
public KubeDiscoveryService(String namespace, String namePrefix,
Map<String, String> podLabels,
List<V1OwnerReference> ownerReferences,
ApiClientFactory apiClientFactory) {
this(namespace, namePrefix, podLabels, ownerReferences, apiClientFactory,
new ArrayList<>(), new HashMap<>());
}

public KubeDiscoveryService(String namespace, String namePrefix,
public KubeDiscoveryService(String namespace, String namePrefix, String podName,

Check warning on line 113 in cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryService.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Constructor has 8 parameters, which is greater than 7 authorized.

See more on https://sonarcloud.io/project/issues?id=cdapio_cdap&issues=AZ5zyXNEgFMCfqFhHdrD&open=AZ5zyXNEgFMCfqFhHdrD&pullRequest=16153
Map<String, String> podLabels, List<V1OwnerReference> ownerReferences,
ApiClientFactory apiClientFactory, List<String> loadBalancerServiceList,
Map<String, String> loadBalancerServiceAnnotations) {
this.namespace = namespace;
this.namePrefix = namePrefix;
this.podName = podName;
this.serviceDiscovereds = new ConcurrentHashMap<>();
this.apiClientFactory = apiClientFactory;
this.podLabels = podLabels;
Expand All @@ -134,6 +134,13 @@

try {
CoreV1Api api = getCoreApi();
if (isPodTerminating(api)) {
LOG.info("Pod {} is terminating. Skipping service registration for {}.", podName,
discoverable.getName());
return () -> {
};
}

while (true) {
Optional<V1Service> currentService = getV1Service(api, serviceName,
discoverable.getName());
Expand Down Expand Up @@ -352,6 +359,9 @@
if (port == null || port != discoverable.getSocketAddress().getPort()) {
return true;
}
if (hasOwnerReferencesChanged(currentService, ownerReferences)) {
return true;
}
// If service type is Cluster IP no need to update. We don't check if the
// service's Cluster IP has changed because the discoverable stores only the
// service's hostname. We rely on Kube DNS to resolve the hostname. Kube DNS
Expand All @@ -371,6 +381,29 @@
return true;
}

private boolean hasOwnerReferencesChanged(V1Service currentService,
List<V1OwnerReference> expectedOwners) {
List<V1OwnerReference> currentOwners = Optional.ofNullable(currentService.getMetadata())
.map(V1ObjectMeta::getOwnerReferences).orElse(Collections.emptyList());

return !new HashSet<>(currentOwners).equals(new HashSet<>(expectedOwners));
}
Comment on lines +384 to +390
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The expectedOwners parameter is annotated with @Nullable, but passing a null value to new HashSet<>(expectedOwners) will throw a NullPointerException. We should safely handle the case where expectedOwners is null by defaulting to an empty list.

Suggested change
private boolean hasOwnerReferencesChanged(V1Service currentService,
@Nullable List<V1OwnerReference> expectedOwners) {
List<V1OwnerReference> currentOwners = Optional.ofNullable(currentService.getMetadata())
.map(V1ObjectMeta::getOwnerReferences).orElse(Collections.emptyList());
return !new HashSet<>(currentOwners).equals(new HashSet<>(expectedOwners));
}
private boolean hasOwnerReferencesChanged(V1Service currentService,
@Nullable List<V1OwnerReference> expectedOwners) {
List<V1OwnerReference> currentOwners = Optional.ofNullable(currentService.getMetadata())
.map(V1ObjectMeta::getOwnerReferences).orElse(Collections.emptyList());
List<V1OwnerReference> expected = expectedOwners == null ? Collections.emptyList() : expectedOwners;
return !new HashSet<>(currentOwners).equals(new HashSet<>(expected));
}


private boolean isPodTerminating(CoreV1Api api) throws ApiException {
try {
V1Pod pod = api.readNamespacedPod(podName, namespace, null);
return Optional.ofNullable(pod.getMetadata()).map(V1ObjectMeta::getDeletionTimestamp)
.isPresent();

} catch (ApiException e) {
// If it is a 404 ApiException, the pod is already gone. Treat as terminating.
if (e.getCode() == HttpURLConnection.HTTP_NOT_FOUND) {
return true;
}
throw e;
}
}
Comment on lines +392 to +405
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If the Kubernetes service account lacks permissions to read pods (e.g., due to RBAC restrictions returning 403 Forbidden) or if there is a transient API error, readNamespacedPod will throw an ApiException. Currently, this exception is rethrown, which will cause the entire service registration to fail. To make this more robust, we should catch ApiException, log a warning, and return false (assuming the pod is not terminating) so that service registration can still proceed. Additionally, we should guard against podName being null.

  private boolean isPodTerminating(CoreV1Api api) {
    if (podName == null) {
      return false;
    }
    try {
      V1Pod pod = api.readNamespacedPod(podName, namespace, null);
      return Optional.ofNullable(pod.getMetadata()).map(V1ObjectMeta::getDeletionTimestamp)
          .isPresent();
    } catch (ApiException e) {
      // If it is a 404 ApiException, the pod is already gone. Treat as terminating.
      if (e.getCode() == HttpURLConnection.HTTP_NOT_FOUND) {
        return true;
      }
      LOG.warn("Failed to read pod status for {}. Assuming pod is not terminating.", podName, e);
      return false;
    }
  }


// Returns the k8s service type for the discoverable.
private String getServiceType(Discoverable discoverable) {
if (loadBalancerServiceList.contains(discoverable.getName())) {
Expand Down Expand Up @@ -563,7 +596,6 @@
payload);
}


/**
* A {@link Thread} that keep watching for changes in service in Kubernetes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public void initialize(MasterEnvironmentContext context)
conf.get(LOAD_BALANCER_SERVICES).split(","));
}
discoveryService = new KubeDiscoveryService(cdapInstallNamespace, "cdap-" + instanceName + "-",
podLabels, podInfo.getOwnerReferences(), apiClientFactory,
podInfo.getName(), podLabels, podInfo.getOwnerReferences(), apiClientFactory,
loadBalancerServiceList,
parseLoadBalancerAnnotations(conf.get(LOAD_BALANCER_SERVICE_ANNOTATIONS)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,34 @@ public void beforeTest() {
kubeDiscoveryService = new KubeDiscoveryService(
"cdap-namespace",
NAME_PREFIX,
"test-pod",
POD_LABELS,
Collections.singletonList(OWNER_REFERENCE),
API_CLIENT_FACTORY,
Collections.singletonList(LOAD_BALANCER_SERVICE_NAME),
LOAD_BALANCER_ANNOTATIONS);
}

private KubeDiscoveryService createDiscoveryService(String namespace, String prefix,
Map<String, String> podLabels) {
return new KubeDiscoveryService(
namespace,
prefix,
"test-pod",
podLabels,
Collections.emptyList(),
API_CLIENT_FACTORY,
Collections.emptyList(),
Collections.emptyMap());
}

@Test
@Ignore
public void testDiscoveryService() throws Exception {
String namespace = "default";
Map<String, String> podLabels = ImmutableMap.of("cdap.container", "test");
try (KubeDiscoveryService service = new KubeDiscoveryService(namespace,
"cdap-test-",
podLabels, Collections.emptyList(),
API_CLIENT_FACTORY)) {
try (KubeDiscoveryService service = createDiscoveryService(namespace, "cdap-test-",
podLabels)) {
// Watch for changes
ServiceDiscovered serviceDiscovered = service.discover("test.service");

Expand Down Expand Up @@ -210,10 +221,8 @@ public void testDiscoveryService() throws Exception {
public void testCloseWatchRace() throws Exception {
String namespace = "default";
String prefix = "cdap-test-";
try (KubeDiscoveryService service = new KubeDiscoveryService(namespace,
prefix,
ImmutableMap.of("cdap.container", "test"),
Collections.emptyList(), API_CLIENT_FACTORY)) {
try (KubeDiscoveryService service = createDiscoveryService(namespace, prefix,
ImmutableMap.of("cdap.container", "test"))) {
// Register two services first
service.register(new Discoverable("test1",
new InetSocketAddress(InetAddress.getLoopbackAddress(), 1234)));
Expand Down Expand Up @@ -260,10 +269,8 @@ public void testCloseWatchRace() throws Exception {
public void testCloseWatchRaceDifferentThreads() throws Exception {
String namespace = "default";
String prefix = "cdap-test-";
try (KubeDiscoveryService service = new KubeDiscoveryService(namespace,
prefix,
ImmutableMap.of("cdap.container", "test"),
Collections.emptyList(), API_CLIENT_FACTORY)) {
try (KubeDiscoveryService service = createDiscoveryService(namespace, prefix,
ImmutableMap.of("cdap.container", "test"))) {
// Register two services first
service.register(new Discoverable("test1",
new InetSocketAddress(InetAddress.getLoopbackAddress(), 1234)));
Expand Down Expand Up @@ -471,6 +478,50 @@ public void testIsServiceUpdateNeededServiceTypeChange() {
.get(0), currentService, discoverable));
}

@Test
public void testIsServiceUpdateNeededOwnerChange() {
V1Service currentService = getClusterIpService();
V1OwnerReference differentOwner = new V1OwnerReferenceBuilder(OWNER_REFERENCE).withUid(
"different-uid").build();
currentService.getMetadata().setOwnerReferences(Collections.singletonList(differentOwner));

Discoverable discoverable = new Discoverable(CLUSTER_IP_SERVICE_NAME,
InetSocketAddress.createUnresolved("cdap.io", SERVICE_PORT));

Assert.assertTrue(
kubeDiscoveryService.isServiceUpdateNeeded(currentService.getSpec().getPorts().get(0),
currentService, discoverable));
}

@Test
public void testIsServiceUpdateNeededNoOwnerOnService() {
V1Service currentService = getClusterIpService();
currentService.getMetadata().setOwnerReferences(null);

Discoverable discoverable = new Discoverable(CLUSTER_IP_SERVICE_NAME,
InetSocketAddress.createUnresolved("cdap.io", SERVICE_PORT));

Assert.assertTrue(
kubeDiscoveryService.isServiceUpdateNeeded(currentService.getSpec().getPorts().get(0),
currentService, discoverable));
}

@Test
public void testIsServiceUpdateNeededBothNoOwners() {
KubeDiscoveryService noOwnerService = createDiscoveryService("cdap-namespace", NAME_PREFIX,
POD_LABELS);

V1Service currentService = getClusterIpService();
currentService.getMetadata().setOwnerReferences(null);

Discoverable discoverable = new Discoverable(CLUSTER_IP_SERVICE_NAME,
InetSocketAddress.createUnresolved("cdap.io", SERVICE_PORT));

Assert.assertFalse(
noOwnerService.isServiceUpdateNeeded(currentService.getSpec().getPorts().get(0),
currentService, discoverable));
}

private V1Service getLoadBalancerService() {
HashMap<String, String> annotations = new HashMap<>(
LOAD_BALANCER_ANNOTATIONS);
Expand Down
Loading