diff --git a/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryService.java b/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryService.java index 3ba3f67a0364..cd78998331bd 100644 --- a/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryService.java +++ b/cdap-kubernetes/src/main/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryService.java @@ -25,6 +25,7 @@ import io.kubernetes.client.openapi.apis.CoreV1Api; import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.openapi.models.V1OwnerReference; +import io.kubernetes.client.openapi.models.V1Pod; import io.kubernetes.client.openapi.models.V1Service; import io.kubernetes.client.openapi.models.V1ServiceBuilder; import io.kubernetes.client.openapi.models.V1ServiceList; @@ -34,10 +35,10 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Base64; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -78,6 +79,7 @@ public class KubeDiscoveryService implements DiscoveryService, private static final String SERVICE_TYPE_CLUSTER_IP = "ClusterIP"; private static final String PAYLOAD_NAME = "cdap.service.payload"; + private final String podName; private final String namespace; private final String namePrefix; private final Map serviceDiscovereds; @@ -97,26 +99,24 @@ public class KubeDiscoveryService implements DiscoveryService, private final Map loadBalancerServiceAnnotations; /** - * Constructor to create an instance for service discovery on the given - * Kubernetes namespace. + * Constructor to create an instance for service discovery. * - * @param namespace the Kubernetes namespace to perform service discovery on - * @param namePrefix prefix applies to all service names in k8s + * @param namespace the Kubernetes namespace to perform service discovery on + * @param namePrefix prefix applies to all service names in k8s + * @param podName name of the current pod + * @param podLabels labels of the current pod + * @param ownerReferences owner references to set on created services + * @param apiClientFactory factory to create Kubernetes API clients + * @param loadBalancerServiceList list of services that should be exposed via LoadBalancer + * @param loadBalancerServiceAnnotations annotations to apply to LoadBalancer services */ - public KubeDiscoveryService(String namespace, String namePrefix, - Map podLabels, - List ownerReferences, - ApiClientFactory apiClientFactory) { - this(namespace, namePrefix, podLabels, ownerReferences, apiClientFactory, - new ArrayList<>(), new HashMap<>()); - } - - public KubeDiscoveryService(String namespace, String namePrefix, + public KubeDiscoveryService(String namespace, String namePrefix, String podName, Map podLabels, List ownerReferences, ApiClientFactory apiClientFactory, List loadBalancerServiceList, Map loadBalancerServiceAnnotations) { this.namespace = namespace; this.namePrefix = namePrefix; + this.podName = podName; this.serviceDiscovereds = new ConcurrentHashMap<>(); this.apiClientFactory = apiClientFactory; this.podLabels = podLabels; @@ -134,6 +134,13 @@ public Cancellable register(Discoverable discoverable) { try { CoreV1Api api = getCoreApi(); + if (isPodTerminating(api)) { + LOG.info("Pod {} is terminating. Skipping service registration for {}.", podName, + discoverable.getName()); + return () -> { + }; + } + while (true) { Optional currentService = getV1Service(api, serviceName, discoverable.getName()); @@ -352,6 +359,9 @@ boolean isServiceUpdateNeeded(V1ServicePort servicePort, if (port == null || port != discoverable.getSocketAddress().getPort()) { return true; } + if (hasOwnerReferencesChanged(currentService, ownerReferences)) { + return true; + } // If service type is Cluster IP no need to update. We don't check if the // service's Cluster IP has changed because the discoverable stores only the // service's hostname. We rely on Kube DNS to resolve the hostname. Kube DNS @@ -371,6 +381,29 @@ boolean isServiceUpdateNeeded(V1ServicePort servicePort, return true; } + private boolean hasOwnerReferencesChanged(V1Service currentService, + List expectedOwners) { + List currentOwners = Optional.ofNullable(currentService.getMetadata()) + .map(V1ObjectMeta::getOwnerReferences).orElse(Collections.emptyList()); + + return !new HashSet<>(currentOwners).equals(new HashSet<>(expectedOwners)); + } + + private boolean isPodTerminating(CoreV1Api api) throws ApiException { + try { + V1Pod pod = api.readNamespacedPod(podName, namespace, null); + return Optional.ofNullable(pod.getMetadata()).map(V1ObjectMeta::getDeletionTimestamp) + .isPresent(); + + } catch (ApiException e) { + // If it is a 404 ApiException, the pod is already gone. Treat as terminating. + if (e.getCode() == HttpURLConnection.HTTP_NOT_FOUND) { + return true; + } + throw e; + } + } + // Returns the k8s service type for the discoverable. private String getServiceType(Discoverable discoverable) { if (loadBalancerServiceList.contains(discoverable.getName())) { @@ -563,7 +596,6 @@ private Discoverable createDiscoverable(String name, payload); } - /** * A {@link Thread} that keep watching for changes in service in Kubernetes. */ diff --git a/cdap-kubernetes/src/main/java/io/cdap/cdap/master/environment/k8s/KubeMasterEnvironment.java b/cdap-kubernetes/src/main/java/io/cdap/cdap/master/environment/k8s/KubeMasterEnvironment.java index 21195f91328f..2e3717c65016 100644 --- a/cdap-kubernetes/src/main/java/io/cdap/cdap/master/environment/k8s/KubeMasterEnvironment.java +++ b/cdap-kubernetes/src/main/java/io/cdap/cdap/master/environment/k8s/KubeMasterEnvironment.java @@ -347,7 +347,7 @@ public void initialize(MasterEnvironmentContext context) conf.get(LOAD_BALANCER_SERVICES).split(",")); } discoveryService = new KubeDiscoveryService(cdapInstallNamespace, "cdap-" + instanceName + "-", - podLabels, podInfo.getOwnerReferences(), apiClientFactory, + podInfo.getName(), podLabels, podInfo.getOwnerReferences(), apiClientFactory, loadBalancerServiceList, parseLoadBalancerAnnotations(conf.get(LOAD_BALANCER_SERVICE_ANNOTATIONS))); diff --git a/cdap-kubernetes/src/test/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryServiceTest.java b/cdap-kubernetes/src/test/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryServiceTest.java index 0d44c1a3aeb0..183f1934a2dc 100644 --- a/cdap-kubernetes/src/test/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryServiceTest.java +++ b/cdap-kubernetes/src/test/java/io/cdap/cdap/k8s/discovery/KubeDiscoveryServiceTest.java @@ -97,6 +97,7 @@ public void beforeTest() { kubeDiscoveryService = new KubeDiscoveryService( "cdap-namespace", NAME_PREFIX, + "test-pod", POD_LABELS, Collections.singletonList(OWNER_REFERENCE), API_CLIENT_FACTORY, @@ -104,16 +105,26 @@ public void beforeTest() { LOAD_BALANCER_ANNOTATIONS); } + private KubeDiscoveryService createDiscoveryService(String namespace, String prefix, + Map podLabels) { + return new KubeDiscoveryService( + namespace, + prefix, + "test-pod", + podLabels, + Collections.emptyList(), + API_CLIENT_FACTORY, + Collections.emptyList(), + Collections.emptyMap()); + } @Test @Ignore public void testDiscoveryService() throws Exception { String namespace = "default"; Map podLabels = ImmutableMap.of("cdap.container", "test"); - try (KubeDiscoveryService service = new KubeDiscoveryService(namespace, - "cdap-test-", - podLabels, Collections.emptyList(), - API_CLIENT_FACTORY)) { + try (KubeDiscoveryService service = createDiscoveryService(namespace, "cdap-test-", + podLabels)) { // Watch for changes ServiceDiscovered serviceDiscovered = service.discover("test.service"); @@ -210,10 +221,8 @@ public void testDiscoveryService() throws Exception { public void testCloseWatchRace() throws Exception { String namespace = "default"; String prefix = "cdap-test-"; - try (KubeDiscoveryService service = new KubeDiscoveryService(namespace, - prefix, - ImmutableMap.of("cdap.container", "test"), - Collections.emptyList(), API_CLIENT_FACTORY)) { + try (KubeDiscoveryService service = createDiscoveryService(namespace, prefix, + ImmutableMap.of("cdap.container", "test"))) { // Register two services first service.register(new Discoverable("test1", new InetSocketAddress(InetAddress.getLoopbackAddress(), 1234))); @@ -260,10 +269,8 @@ public void testCloseWatchRace() throws Exception { public void testCloseWatchRaceDifferentThreads() throws Exception { String namespace = "default"; String prefix = "cdap-test-"; - try (KubeDiscoveryService service = new KubeDiscoveryService(namespace, - prefix, - ImmutableMap.of("cdap.container", "test"), - Collections.emptyList(), API_CLIENT_FACTORY)) { + try (KubeDiscoveryService service = createDiscoveryService(namespace, prefix, + ImmutableMap.of("cdap.container", "test"))) { // Register two services first service.register(new Discoverable("test1", new InetSocketAddress(InetAddress.getLoopbackAddress(), 1234))); @@ -471,6 +478,50 @@ public void testIsServiceUpdateNeededServiceTypeChange() { .get(0), currentService, discoverable)); } + @Test + public void testIsServiceUpdateNeededOwnerChange() { + V1Service currentService = getClusterIpService(); + V1OwnerReference differentOwner = new V1OwnerReferenceBuilder(OWNER_REFERENCE).withUid( + "different-uid").build(); + currentService.getMetadata().setOwnerReferences(Collections.singletonList(differentOwner)); + + Discoverable discoverable = new Discoverable(CLUSTER_IP_SERVICE_NAME, + InetSocketAddress.createUnresolved("cdap.io", SERVICE_PORT)); + + Assert.assertTrue( + kubeDiscoveryService.isServiceUpdateNeeded(currentService.getSpec().getPorts().get(0), + currentService, discoverable)); + } + + @Test + public void testIsServiceUpdateNeededNoOwnerOnService() { + V1Service currentService = getClusterIpService(); + currentService.getMetadata().setOwnerReferences(null); + + Discoverable discoverable = new Discoverable(CLUSTER_IP_SERVICE_NAME, + InetSocketAddress.createUnresolved("cdap.io", SERVICE_PORT)); + + Assert.assertTrue( + kubeDiscoveryService.isServiceUpdateNeeded(currentService.getSpec().getPorts().get(0), + currentService, discoverable)); + } + + @Test + public void testIsServiceUpdateNeededBothNoOwners() { + KubeDiscoveryService noOwnerService = createDiscoveryService("cdap-namespace", NAME_PREFIX, + POD_LABELS); + + V1Service currentService = getClusterIpService(); + currentService.getMetadata().setOwnerReferences(null); + + Discoverable discoverable = new Discoverable(CLUSTER_IP_SERVICE_NAME, + InetSocketAddress.createUnresolved("cdap.io", SERVICE_PORT)); + + Assert.assertFalse( + noOwnerService.isServiceUpdateNeeded(currentService.getSpec().getPorts().get(0), + currentService, discoverable)); + } + private V1Service getLoadBalancerService() { HashMap annotations = new HashMap<>( LOAD_BALANCER_ANNOTATIONS);