diff --git a/deployments/kubernetes/chart/reloader/templates/clusterrole.yaml b/deployments/kubernetes/chart/reloader/templates/clusterrole.yaml index bd14dfeb7..a800f655f 100644 --- a/deployments/kubernetes/chart/reloader/templates/clusterrole.yaml +++ b/deployments/kubernetes/chart/reloader/templates/clusterrole.yaml @@ -96,6 +96,14 @@ rules: - list - get {{- end}} +{{- if and .Values.reloader.ignoreCronJobs .Values.reloader.ignoreJobs }}{{- else }} + - apiGroups: + - "" + resources: + - pods + verbs: + - deletecollection +{{- end }} {{- if .Values.reloader.enableHA }} - apiGroups: - "coordination.k8s.io" diff --git a/deployments/kubernetes/manifests/clusterrole.yaml b/deployments/kubernetes/manifests/clusterrole.yaml index f2fc68104..c8ff7f19b 100644 --- a/deployments/kubernetes/manifests/clusterrole.yaml +++ b/deployments/kubernetes/manifests/clusterrole.yaml @@ -51,6 +51,12 @@ rules: - delete - list - get + - apiGroups: + - "" + resources: + - pods + verbs: + - deletecollection - apiGroups: - "" resources: diff --git a/internal/pkg/callbacks/rolling_upgrade.go b/internal/pkg/callbacks/rolling_upgrade.go index 13e5a63cd..9542e7795 100644 --- a/internal/pkg/callbacks/rolling_upgrade.go +++ b/internal/pkg/callbacks/rolling_upgrade.go @@ -16,8 +16,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" patchtypes "k8s.io/apimachinery/pkg/types" - "maps" - argorolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" ) @@ -42,7 +40,7 @@ type UpdateFunc func(kube.Clients, string, runtime.Object) error // PatchFunc performs the resource patch type PatchFunc func(kube.Clients, string, runtime.Object, patchtypes.PatchType, []byte) error -// PatchTemplateFunc is a generic func to return strategic merge JSON patch template +// PatchTemplatesFunc is a generic func to return strategic merge JSON patch template type PatchTemplatesFunc func() PatchTemplates // AnnotationsFunc is a generic func to return annotations @@ -442,26 +440,20 @@ func PatchDeployment(clients kube.Clients, namespace string, resource runtime.Ob return err } -// CreateJobFromCronjob performs rolling upgrade on cronjob -func CreateJobFromCronjob(clients kube.Clients, namespace string, resource runtime.Object) error { +// RestartRunningCronjobPods restarts all pods currently running cronjob jobs +func RestartRunningCronjobPods(clients kube.Clients, namespace string, resource runtime.Object) error { cronJob := resource.(*batchv1.CronJob) - - annotations := make(map[string]string) - annotations["cronjob.kubernetes.io/instantiate"] = "manual" - maps.Copy(annotations, cronJob.Spec.JobTemplate.Annotations) - - job := &batchv1.Job{ - ObjectMeta: meta_v1.ObjectMeta{ - GenerateName: cronJob.Name + "-", - Namespace: cronJob.Namespace, - Annotations: annotations, - Labels: cronJob.Spec.JobTemplate.Labels, - OwnerReferences: []meta_v1.OwnerReference{*meta_v1.NewControllerRef(cronJob, batchv1.SchemeGroupVersion.WithKind("CronJob"))}, - }, - Spec: cronJob.Spec.JobTemplate.Spec, - } - _, err := clients.KubernetesClient.BatchV1().Jobs(namespace).Create(context.TODO(), job, meta_v1.CreateOptions{FieldManager: "Reloader"}) - return err + for _, job := range cronJob.Status.Active { + logrus.Debugf("Deleting running pods for active Job %s/%s", job.Namespace, job.Name) + listOpts := meta_v1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", batchv1.JobNameLabel, job.Name), + FieldSelector: "status.phase=Running", + } + if err := clients.KubernetesClient.CoreV1().Pods(namespace).DeleteCollection(context.TODO(), meta_v1.DeleteOptions{}, listOpts); err != nil { + return err + } + } + return nil } func PatchCronJob(clients kube.Clients, namespace string, resource runtime.Object, patchType patchtypes.PatchType, bytes []byte) error { diff --git a/internal/pkg/callbacks/rolling_upgrade_test.go b/internal/pkg/callbacks/rolling_upgrade_test.go index 75583de45..6e03d8fec 100644 --- a/internal/pkg/callbacks/rolling_upgrade_test.go +++ b/internal/pkg/callbacks/rolling_upgrade_test.go @@ -7,15 +7,20 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" watch "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" argorolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" fakeargoclientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake" @@ -48,8 +53,36 @@ func newTestFixtures() testFixtures { } func setupTestClients() kube.Clients { + fakeClient := fake.NewClientset() + + // Add a reactor to handle Pod DeleteCollection with support for filtering on Phase + fakeClient.PrependReactor("delete-collection", "pods", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + deleteAction, _ := action.(k8stesting.DeleteCollectionActionImpl) + labelSelector, fieldSelector, _ := k8stesting.ExtractFromListOptions(deleteAction.ListOptions) + gvk := v1.SchemeGroupVersion.WithKind("Pod") + obj, _ := fakeClient.Tracker().List(deleteAction.GetResource(), gvk, deleteAction.GetNamespace(), deleteAction.ListOptions) + podList, _ := obj.(*v1.PodList) + for _, pod := range podList.Items { + podFields := fields.Set(map[string]string{ + "status.phase": string(pod.Status.Phase), + }) + if fieldSelector != nil && !fieldSelector.Matches(podFields) { + continue + } + if !labelSelector.Matches(labels.Set(pod.Labels)) { + continue + } + logrus.Infof("Deleting pod %s via DeleteCollection", pod.Name) + err = fakeClient.Tracker().Delete(deleteAction.GetResource(), deleteAction.GetNamespace(), pod.Name) + if err != nil { + logrus.Errorf("Error deleting pod %s: %v", pod.Name, err) + } + } + return true, nil, nil + }) + return kube.Clients{ - KubernetesClient: fake.NewClientset(), + KubernetesClient: fakeClient, ArgoRolloutClient: fakeargoclientset.NewSimpleClientset(), } } @@ -412,30 +445,48 @@ func TestPatchResources(t *testing.T) { } } -func TestCreateJobFromCronjob(t *testing.T) { +func TestRestartRunningCronjobPods(t *testing.T) { fixtures := newTestFixtures() runtimeObj, err := createTestCronJobWithAnnotations(clients, fixtures.namespace, "1") - assert.NoError(t, err) - + require.NoError(t, err) cronJob := runtimeObj.(*batchv1.CronJob) - err = callbacks.CreateJobFromCronjob(clients, fixtures.namespace, cronJob) - assert.NoError(t, err) - jobList, err := clients.KubernetesClient.BatchV1().Jobs(fixtures.namespace).List(context.TODO(), metav1.ListOptions{}) - assert.NoError(t, err) + // Create running job and pod + runningJob, err := createTestJobForCronJob(clients, cronJob, "job1", true) + require.NoError(t, err) + runningPod, err := createTestPodForJob(clients, runningJob, "pod1", v1.PodRunning) + require.NoError(t, err) - ownerFound := false - for _, job := range jobList.Items { - if isControllerOwner("CronJob", cronJob.Name, job.OwnerReferences) { - ownerFound = true - break - } - } - assert.Truef(t, ownerFound, "Missing CronJob owner reference") + // Create succeeded job and pod + succeededJob, err := createTestJobForCronJob(clients, cronJob, "job2", false) + require.NoError(t, err) + succeededPod, err := createTestPodForJob(clients, succeededJob, "pod2", v1.PodSucceeded) + require.NoError(t, err) + + // Create a pod that is not associated with a Job + otherPod, err := createTestPod(clients, fixtures.namespace, "other", nil, v1.PodRunning) + require.NoError(t, err) + + // Run the restart + err = callbacks.RestartRunningCronjobPods(clients, fixtures.namespace, cronJob) + require.NoError(t, err) + // Verify running pod was deleted + _, err = clients.KubernetesClient.CoreV1().Pods(fixtures.namespace).Get(context.TODO(), runningPod.Name, metav1.GetOptions{}) + assert.Error(t, err, "Running pod should have been deleted") + + // Verify succeeded pod remains + _, err = clients.KubernetesClient.CoreV1().Pods(fixtures.namespace).Get(context.TODO(), succeededPod.Name, metav1.GetOptions{}) + assert.NoError(t, err, "Succeeded pod should not have been deleted") + + // Verify other pod remains + _, err = clients.KubernetesClient.CoreV1().Pods(fixtures.namespace).Get(context.TODO(), otherPod.Name, metav1.GetOptions{}) + assert.NoError(t, err, "Other pod should not have been deleted") + + // Clean up err = deleteTestCronJob(clients, fixtures.namespace, cronJob.Name) - assert.NoError(t, err) + require.NoError(t, err) } func TestReCreateJobFromJob(t *testing.T) { @@ -763,11 +814,56 @@ func deleteTestJob(clients kube.Clients, namespace, name string) error { return clients.KubernetesClient.BatchV1().Jobs(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) } -func isControllerOwner(kind, name string, ownerRefs []metav1.OwnerReference) bool { - for _, ownerRef := range ownerRefs { - if *ownerRef.Controller && ownerRef.Kind == kind && ownerRef.Name == name { - return true +// createTestJobForCronJob creates a minimal representation of a Job for the given CronJob +func createTestJobForCronJob(clients kube.Clients, cronJob *batchv1.CronJob, name string, active bool) (*batchv1.Job, error) { + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: cronJob.Name + "-" + name, + Namespace: cronJob.Namespace, + Labels: map[string]string{ + batchv1.ControllerUidLabel: string(cronJob.UID), + }, + }, + } + job, err := clients.KubernetesClient.BatchV1().Jobs(cronJob.Namespace).Create(context.TODO(), job, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + + // Update the CronJob status to include the new Job if it is active + if active { + cronJob.Status.Active = append(cronJob.Status.Active, v1.ObjectReference{ + Name: job.Name, + Namespace: job.Namespace, + }) + _, err = clients.KubernetesClient.BatchV1().CronJobs(cronJob.Namespace).Update(context.TODO(), cronJob, metav1.UpdateOptions{}) + if err != nil { + return nil, err } } - return false + return job, err +} + +// createTestPod creates a minimal representation of a pod +func createTestPod(clients kube.Clients, namespace, name string, metaLabels map[string]string, phase v1.PodPhase) (*v1.Pod, error) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: patchtypes.UID(name), + Name: name, + Namespace: namespace, + Labels: metaLabels, + }, + Status: v1.PodStatus{ + Phase: phase, + }, + } + return clients.KubernetesClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) +} + +// createTestPodForJob creates a minimal representation of a pod for the given Job +func createTestPodForJob(clients kube.Clients, job *batchv1.Job, name string, phase v1.PodPhase) (*v1.Pod, error) { + metaLabels := map[string]string{ + batchv1.JobNameLabel: job.Name, + } + return createTestPod(clients, job.Namespace, job.Name+"-"+name, metaLabels, phase) } diff --git a/internal/pkg/handler/upgrade.go b/internal/pkg/handler/upgrade.go index 982dbfad5..3f74592a0 100644 --- a/internal/pkg/handler/upgrade.go +++ b/internal/pkg/handler/upgrade.go @@ -52,7 +52,7 @@ func GetDeploymentRollingUpgradeFuncs() callbacks.RollingUpgradeFuncs { } } -// GetDeploymentRollingUpgradeFuncs returns all callback funcs for a cronjob +// GetCronJobCreateJobFuncs returns all callback funcs for a cronjob func GetCronJobCreateJobFuncs() callbacks.RollingUpgradeFuncs { return callbacks.RollingUpgradeFuncs{ ItemFunc: callbacks.GetCronJobItem, @@ -61,7 +61,7 @@ func GetCronJobCreateJobFuncs() callbacks.RollingUpgradeFuncs { PodAnnotationsFunc: callbacks.GetCronJobPodAnnotations, ContainersFunc: callbacks.GetCronJobContainers, InitContainersFunc: callbacks.GetCronJobInitContainers, - UpdateFunc: callbacks.CreateJobFromCronjob, + UpdateFunc: callbacks.RestartRunningCronjobPods, PatchFunc: callbacks.PatchCronJob, PatchTemplatesFunc: func() callbacks.PatchTemplates { return callbacks.PatchTemplates{} }, VolumesFunc: callbacks.GetCronJobVolumes, @@ -70,7 +70,7 @@ func GetCronJobCreateJobFuncs() callbacks.RollingUpgradeFuncs { } } -// GetDeploymentRollingUpgradeFuncs returns all callback funcs for a cronjob +// GetJobCreateJobFuncs returns all callback funcs for a cronjob func GetJobCreateJobFuncs() callbacks.RollingUpgradeFuncs { return callbacks.RollingUpgradeFuncs{ ItemFunc: callbacks.GetJobItem, diff --git a/internal/pkg/testutil/kube.go b/internal/pkg/testutil/kube.go index a778eb15f..20868ddfa 100644 --- a/internal/pkg/testutil/kube.go +++ b/internal/pkg/testutil/kube.go @@ -41,7 +41,7 @@ var ( ConfigmapResourceType = "configMaps" // SecretResourceType is a resource type which controller watches for changes SecretResourceType = "secrets" - // SecretproviderclasspodstatusResourceType is a resource type which controller watches for changes + // SecretProviderClassPodStatusResourceType is a resource type which controller watches for changes SecretProviderClassPodStatusResourceType = "secretproviderclasspodstatuses" ) @@ -886,7 +886,7 @@ func CreateDeployment(client kubernetes.Interface, deploymentName string, namesp return deployment, err } -// CreateDeployment creates a deployment in given namespace and returns the Deployment +// CreateDeploymentWithAnnotations creates a deployment in given namespace and returns the Deployment func CreateDeploymentWithAnnotations(client kubernetes.Interface, deploymentName string, namespace string, additionalAnnotations map[string]string, volumeMount bool) (*appsv1.Deployment, error) { logrus.Infof("Creating Deployment") deploymentClient := client.AppsV1().Deployments(namespace) @@ -1088,7 +1088,7 @@ func DeleteCronJob(client kubernetes.Interface, namespace string, cronJobName st return cronJobError } -// Deleteob deletes a job in given namespace and returns the error if any +// DeleteJob deletes a job in given namespace and returns the error if any func DeleteJob(client kubernetes.Interface, namespace string, jobName string) error { logrus.Infof("Deleting Job %s", jobName) jobError := client.BatchV1().Jobs(namespace).Delete(context.TODO(), jobName, metav1.DeleteOptions{}) diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 9582929c4..501b93af4 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -25,7 +25,7 @@ type Clients struct { var ( // IsOpenshift is true if environment is Openshift, it is false if environment is Kubernetes IsOpenshift = isOpenshift() - // IsCSIEnabled is true if environment has CSI provider installed, otherwise false + // IsCSIInstalled is true if environment has CSI provider installed, otherwise false IsCSIInstalled = isCSIInstalled() )