Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1OwnerReference;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceBuilder;
import io.kubernetes.client.openapi.models.V1ServiceList;
Expand All @@ -34,10 +35,10 @@
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -78,6 +79,7 @@
private static final String SERVICE_TYPE_CLUSTER_IP = "ClusterIP";
private static final String PAYLOAD_NAME = "cdap.service.payload";

private final String podName;
private final String namespace;
private final String namePrefix;
private final Map<String, DefaultServiceDiscovered> serviceDiscovereds;
Expand All @@ -97,26 +99,24 @@
private final Map<String, String> loadBalancerServiceAnnotations;

/**
* Constructor to create an instance for service discovery on the given
* Kubernetes namespace.
* Constructor to create an instance for service discovery.
*
* @param namespace the Kubernetes namespace to perform service discovery on
* @param namePrefix prefix applies to all service names in k8s
* @param namespace the Kubernetes namespace to perform service discovery on
* @param namePrefix prefix applies to all service names in k8s
* @param podName name of the current pod
* @param podLabels labels of the current pod
* @param ownerReferences owner references to set on created services
* @param apiClientFactory factory to create Kubernetes API clients
* @param loadBalancerServiceList list of services that should be exposed via LoadBalancer
* @param loadBalancerServiceAnnotations annotations to apply to LoadBalancer services
*/
public KubeDiscoveryService(String namespace, String namePrefix,
Map<String, String> podLabels,
List<V1OwnerReference> ownerReferences,
ApiClientFactory apiClientFactory) {
this(namespace, namePrefix, podLabels, ownerReferences, apiClientFactory,
new ArrayList<>(), new HashMap<>());
}

public KubeDiscoveryService(String namespace, String namePrefix,
public KubeDiscoveryService(String namespace, String namePrefix, String podName,

Check warning on line 113 in cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryService.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Constructor has 8 parameters, which is greater than 7 authorized.

See more on https://sonarcloud.io/project/issues?id=cdapio_cdap&issues=AZ5z0R2TxKZvTtDNnHHe&open=AZ5z0R2TxKZvTtDNnHHe&pullRequest=16151
Map<String, String> podLabels, List<V1OwnerReference> ownerReferences,
ApiClientFactory apiClientFactory, List<String> loadBalancerServiceList,
Map<String, String> loadBalancerServiceAnnotations) {
this.namespace = namespace;
this.namePrefix = namePrefix;
this.podName = podName;
this.serviceDiscovereds = new ConcurrentHashMap<>();
this.apiClientFactory = apiClientFactory;
this.podLabels = podLabels;
Expand All @@ -134,6 +134,13 @@

try {
CoreV1Api api = getCoreApi();
if (isPodTerminating(api)) {
LOG.info("Pod {} is terminating. Skipping service registration for {}.", podName,
discoverable.getName());
return () -> {
};
}

while (true) {
Optional<V1Service> currentService = getV1Service(api, serviceName,
discoverable.getName());
Expand Down Expand Up @@ -352,6 +359,9 @@
if (port == null || port != discoverable.getSocketAddress().getPort()) {
return true;
}
if (hasOwnerReferencesChanged(currentService, ownerReferences)) {
return true;
}
// If service type is Cluster IP no need to update. We don't check if the
// service's Cluster IP has changed because the discoverable stores only the
// service's hostname. We rely on Kube DNS to resolve the hostname. Kube DNS
Expand All @@ -371,6 +381,29 @@
return true;
}

private boolean hasOwnerReferencesChanged(V1Service currentService,
List<V1OwnerReference> expectedOwners) {
List<V1OwnerReference> currentOwners = Optional.ofNullable(currentService.getMetadata())
.map(V1ObjectMeta::getOwnerReferences).orElse(Collections.emptyList());

return !new HashSet<>(currentOwners).equals(new HashSet<>(expectedOwners));
}

private boolean isPodTerminating(CoreV1Api api) throws ApiException {
try {
V1Pod pod = api.readNamespacedPod(podName, namespace, null);
return Optional.ofNullable(pod.getMetadata()).map(V1ObjectMeta::getDeletionTimestamp)
.isPresent();

} catch (ApiException e) {
// If it is a 404 ApiException, the pod is already gone. Treat as terminating.
if (e.getCode() == HttpURLConnection.HTTP_NOT_FOUND) {
return true;
}
throw e;
}
}

// Returns the k8s service type for the discoverable.
private String getServiceType(Discoverable discoverable) {
if (loadBalancerServiceList.contains(discoverable.getName())) {
Expand Down Expand Up @@ -399,7 +432,7 @@
@VisibleForTesting
V1Service createService(String serviceName,
Discoverable discoverable) {
V1Service service = new V1Service();

Check warning on line 435 in cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck

Distance between variable 'service' declaration and its first usage is 5, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).
V1ObjectMeta meta = new V1ObjectMeta();
meta.setName(serviceName);
meta.setLabels(Collections.singletonMap(SERVICE_LABEL,
Expand Down Expand Up @@ -536,9 +569,9 @@
}

private Optional<String> getLoadBalancerIp(V1Service service) {
if (service.getStatus() == null ||

Check warning on line 572 in cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.OperatorWrapCheck

'||' should be on a new line.
service.getStatus().getLoadBalancer() == null ||

Check warning on line 573 in cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.OperatorWrapCheck

'||' should be on a new line.
service.getStatus().getLoadBalancer().getIngress() == null ||

Check warning on line 574 in cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryService.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.whitespace.OperatorWrapCheck

'||' should be on a new line.
service.getStatus().getLoadBalancer().getIngress().isEmpty()) {
return Optional.empty();
}
Expand All @@ -563,7 +596,6 @@
payload);
}


/**
* A {@link Thread} that keep watching for changes in service in Kubernetes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public void initialize(MasterEnvironmentContext context)
conf.get(LOAD_BALANCER_SERVICES).split(","));
}
discoveryService = new KubeDiscoveryService(cdapInstallNamespace, "cdap-" + instanceName + "-",
podLabels, podInfo.getOwnerReferences(), apiClientFactory,
podInfo.getName(), podLabels, podInfo.getOwnerReferences(), apiClientFactory,
loadBalancerServiceList,
parseLoadBalancerAnnotations(conf.get(LOAD_BALANCER_SERVICE_ANNOTATIONS)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,34 @@
kubeDiscoveryService = new KubeDiscoveryService(
"cdap-namespace",
NAME_PREFIX,
"test-pod",
POD_LABELS,
Collections.singletonList(OWNER_REFERENCE),
API_CLIENT_FACTORY,
Collections.singletonList(LOAD_BALANCER_SERVICE_NAME),
LOAD_BALANCER_ANNOTATIONS);
}

private KubeDiscoveryService createDiscoveryService(String namespace, String prefix,
Map<String, String> podLabels) {
return new KubeDiscoveryService(
namespace,
prefix,
"test-pod",
podLabels,
Collections.emptyList(),
API_CLIENT_FACTORY,
Collections.emptyList(),
Collections.emptyMap());
}

@Test
@Ignore
public void testDiscoveryService() throws Exception {
String namespace = "default";
Map<String, String> podLabels = ImmutableMap.of("cdap.container", "test");
try (KubeDiscoveryService service = new KubeDiscoveryService(namespace,
"cdap-test-",
podLabels, Collections.emptyList(),
API_CLIENT_FACTORY)) {
try (KubeDiscoveryService service = createDiscoveryService(namespace, "cdap-test-",
podLabels)) {
// Watch for changes
ServiceDiscovered serviceDiscovered = service.discover("test.service");

Expand Down Expand Up @@ -210,10 +221,8 @@
public void testCloseWatchRace() throws Exception {
String namespace = "default";
String prefix = "cdap-test-";
try (KubeDiscoveryService service = new KubeDiscoveryService(namespace,
prefix,
ImmutableMap.of("cdap.container", "test"),
Collections.emptyList(), API_CLIENT_FACTORY)) {
try (KubeDiscoveryService service = createDiscoveryService(namespace, prefix,
ImmutableMap.of("cdap.container", "test"))) {
// Register two services first
service.register(new Discoverable("test1",
new InetSocketAddress(InetAddress.getLoopbackAddress(), 1234)));
Expand Down Expand Up @@ -260,10 +269,8 @@
public void testCloseWatchRaceDifferentThreads() throws Exception {
String namespace = "default";
String prefix = "cdap-test-";
try (KubeDiscoveryService service = new KubeDiscoveryService(namespace,
prefix,
ImmutableMap.of("cdap.container", "test"),
Collections.emptyList(), API_CLIENT_FACTORY)) {
try (KubeDiscoveryService service = createDiscoveryService(namespace, prefix,
ImmutableMap.of("cdap.container", "test"))) {
// Register two services first
service.register(new Discoverable("test1",
new InetSocketAddress(InetAddress.getLoopbackAddress(), 1234)));
Expand Down Expand Up @@ -377,7 +384,7 @@

@Test
public void updateClusterIpToLoadBalancer() {
Discoverable discoverable = new Discoverable(LOAD_BALANCER_SERVICE_NAME,

Check warning on line 387 in cdap-kubernetes/src/test/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryServiceTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck

Distance between variable 'discoverable' declaration and its first usage is 4, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).
InetSocketAddress.createUnresolved(LOAD_BALANCER_IP, SERVICE_PORT),
TEST_PAYLOAD.getBytes(StandardCharsets.UTF_8));
V1Service currentService = getClusterIpService();
Expand All @@ -396,7 +403,7 @@

@Test
public void updateLoadBalancerToClusterIp() {
Discoverable discoverable = new Discoverable(CLUSTER_IP_SERVICE_NAME,

Check warning on line 406 in cdap-kubernetes/src/test/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryServiceTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck

Distance between variable 'discoverable' declaration and its first usage is 5, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).
InetSocketAddress.createUnresolved("cdap.io", SERVICE_PORT));
V1Service currentService = getLoadBalancerService();
// Change the Service Name.
Expand Down Expand Up @@ -471,6 +478,50 @@
.get(0), currentService, discoverable));
}

@Test
public void testIsServiceUpdateNeededOwnerChange() {
V1Service currentService = getClusterIpService();
V1OwnerReference differentOwner = new V1OwnerReferenceBuilder(OWNER_REFERENCE).withUid(
"different-uid").build();
currentService.getMetadata().setOwnerReferences(Collections.singletonList(differentOwner));

Discoverable discoverable = new Discoverable(CLUSTER_IP_SERVICE_NAME,
InetSocketAddress.createUnresolved("cdap.io", SERVICE_PORT));

Assert.assertTrue(
kubeDiscoveryService.isServiceUpdateNeeded(currentService.getSpec().getPorts().get(0),
currentService, discoverable));
}

@Test
public void testIsServiceUpdateNeededNoOwnerOnService() {
V1Service currentService = getClusterIpService();
currentService.getMetadata().setOwnerReferences(null);

Discoverable discoverable = new Discoverable(CLUSTER_IP_SERVICE_NAME,
InetSocketAddress.createUnresolved("cdap.io", SERVICE_PORT));

Assert.assertTrue(
kubeDiscoveryService.isServiceUpdateNeeded(currentService.getSpec().getPorts().get(0),
currentService, discoverable));
}

@Test
public void testIsServiceUpdateNeededBothNoOwners() {
KubeDiscoveryService noOwnerService = createDiscoveryService("cdap-namespace", NAME_PREFIX,
POD_LABELS);

V1Service currentService = getClusterIpService();
currentService.getMetadata().setOwnerReferences(null);

Discoverable discoverable = new Discoverable(CLUSTER_IP_SERVICE_NAME,
InetSocketAddress.createUnresolved("cdap.io", SERVICE_PORT));

Assert.assertFalse(
noOwnerService.isServiceUpdateNeeded(currentService.getSpec().getPorts().get(0),
currentService, discoverable));
}

private V1Service getLoadBalancerService() {
HashMap<String, String> annotations = new HashMap<>(
LOAD_BALANCER_ANNOTATIONS);
Expand Down
Loading