From fa71bbf120e80625a18d1924689c46459e152ef0 Mon Sep 17 00:00:00 2001 From: Trevor North Date: Fri, 13 Feb 2026 18:01:02 +0000 Subject: [PATCH] Handle CronJob upgrade via active Job Pod deletion Instead of immediately creating new Jobs we will instead delete pods belonging to active Jobs. This will cause the job controller to re-create the pods based on the defined policies. As such we will not violate the CronJob schedule, suspension and concurrency policies. --- .../chart/reloader/templates/clusterrole.yaml | 8 + .../kubernetes/manifests/clusterrole.yaml | 6 + internal/pkg/callbacks/rolling_upgrade.go | 36 ++--- .../pkg/callbacks/rolling_upgrade_test.go | 140 +++++++++++++++--- internal/pkg/handler/upgrade.go | 6 +- internal/pkg/testutil/kube.go | 6 +- pkg/kube/client.go | 2 +- 7 files changed, 153 insertions(+), 51 deletions(-) diff --git a/deployments/kubernetes/chart/reloader/templates/clusterrole.yaml b/deployments/kubernetes/chart/reloader/templates/clusterrole.yaml index bd14dfeb7..a800f655f 100644 --- a/deployments/kubernetes/chart/reloader/templates/clusterrole.yaml +++ b/deployments/kubernetes/chart/reloader/templates/clusterrole.yaml @@ -96,6 +96,14 @@ rules: - list - get {{- end}} +{{- if and .Values.reloader.ignoreCronJobs .Values.reloader.ignoreJobs }}{{- else }} + - apiGroups: + - "" + resources: + - pods + verbs: + - deletecollection +{{- end }} {{- if .Values.reloader.enableHA }} - apiGroups: - "coordination.k8s.io" diff --git a/deployments/kubernetes/manifests/clusterrole.yaml b/deployments/kubernetes/manifests/clusterrole.yaml index f2fc68104..c8ff7f19b 100644 --- a/deployments/kubernetes/manifests/clusterrole.yaml +++ b/deployments/kubernetes/manifests/clusterrole.yaml @@ -51,6 +51,12 @@ rules: - delete - list - get + - apiGroups: + - "" + resources: + - pods + verbs: + - deletecollection - apiGroups: - "" resources: diff --git a/internal/pkg/callbacks/rolling_upgrade.go b/internal/pkg/callbacks/rolling_upgrade.go index 13e5a63cd..9542e7795 100644 --- a/internal/pkg/callbacks/rolling_upgrade.go +++ b/internal/pkg/callbacks/rolling_upgrade.go @@ -16,8 +16,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" patchtypes "k8s.io/apimachinery/pkg/types" - "maps" - argorolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" ) @@ -42,7 +40,7 @@ type UpdateFunc func(kube.Clients, string, runtime.Object) error // PatchFunc performs the resource patch type PatchFunc func(kube.Clients, string, runtime.Object, patchtypes.PatchType, []byte) error -// PatchTemplateFunc is a generic func to return strategic merge JSON patch template +// PatchTemplatesFunc is a generic func to return strategic merge JSON patch template type PatchTemplatesFunc func() PatchTemplates // AnnotationsFunc is a generic func to return annotations @@ -442,26 +440,20 @@ func PatchDeployment(clients kube.Clients, namespace string, resource runtime.Ob return err } -// CreateJobFromCronjob performs rolling upgrade on cronjob -func CreateJobFromCronjob(clients kube.Clients, namespace string, resource runtime.Object) error { +// RestartRunningCronjobPods restarts all pods currently running cronjob jobs +func RestartRunningCronjobPods(clients kube.Clients, namespace string, resource runtime.Object) error { cronJob := resource.(*batchv1.CronJob) - - annotations := make(map[string]string) - annotations["cronjob.kubernetes.io/instantiate"] = "manual" - maps.Copy(annotations, cronJob.Spec.JobTemplate.Annotations) - - job := &batchv1.Job{ - ObjectMeta: meta_v1.ObjectMeta{ - GenerateName: cronJob.Name + "-", - Namespace: cronJob.Namespace, - Annotations: annotations, - Labels: cronJob.Spec.JobTemplate.Labels, - OwnerReferences: []meta_v1.OwnerReference{*meta_v1.NewControllerRef(cronJob, batchv1.SchemeGroupVersion.WithKind("CronJob"))}, - }, - Spec: cronJob.Spec.JobTemplate.Spec, - } - _, err := clients.KubernetesClient.BatchV1().Jobs(namespace).Create(context.TODO(), job, meta_v1.CreateOptions{FieldManager: "Reloader"}) - return err + for _, job := range cronJob.Status.Active { + logrus.Debugf("Deleting running pods for active Job %s/%s", job.Namespace, job.Name) + listOpts := meta_v1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", batchv1.JobNameLabel, job.Name), + FieldSelector: "status.phase=Running", + } + if err := clients.KubernetesClient.CoreV1().Pods(namespace).DeleteCollection(context.TODO(), meta_v1.DeleteOptions{}, listOpts); err != nil { + return err + } + } + return nil } func PatchCronJob(clients kube.Clients, namespace string, resource runtime.Object, patchType patchtypes.PatchType, bytes []byte) error { diff --git a/internal/pkg/callbacks/rolling_upgrade_test.go b/internal/pkg/callbacks/rolling_upgrade_test.go index 75583de45..6e03d8fec 100644 --- a/internal/pkg/callbacks/rolling_upgrade_test.go +++ b/internal/pkg/callbacks/rolling_upgrade_test.go @@ -7,15 +7,20 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" watch "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" argorolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1" fakeargoclientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake" @@ -48,8 +53,36 @@ func newTestFixtures() testFixtures { } func setupTestClients() kube.Clients { + fakeClient := fake.NewClientset() + + // Add a reactor to handle Pod DeleteCollection with support for filtering on Phase + fakeClient.PrependReactor("delete-collection", "pods", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + deleteAction, _ := action.(k8stesting.DeleteCollectionActionImpl) + labelSelector, fieldSelector, _ := k8stesting.ExtractFromListOptions(deleteAction.ListOptions) + gvk := v1.SchemeGroupVersion.WithKind("Pod") + obj, _ := fakeClient.Tracker().List(deleteAction.GetResource(), gvk, deleteAction.GetNamespace(), deleteAction.ListOptions) + podList, _ := obj.(*v1.PodList) + for _, pod := range podList.Items { + podFields := fields.Set(map[string]string{ + "status.phase": string(pod.Status.Phase), + }) + if fieldSelector != nil && !fieldSelector.Matches(podFields) { + continue + } + if !labelSelector.Matches(labels.Set(pod.Labels)) { + continue + } + logrus.Infof("Deleting pod %s via DeleteCollection", pod.Name) + err = fakeClient.Tracker().Delete(deleteAction.GetResource(), deleteAction.GetNamespace(), pod.Name) + if err != nil { + logrus.Errorf("Error deleting pod %s: %v", pod.Name, err) + } + } + return true, nil, nil + }) + return kube.Clients{ - KubernetesClient: fake.NewClientset(), + KubernetesClient: fakeClient, ArgoRolloutClient: fakeargoclientset.NewSimpleClientset(), } } @@ -412,30 +445,48 @@ func TestPatchResources(t *testing.T) { } } -func TestCreateJobFromCronjob(t *testing.T) { +func TestRestartRunningCronjobPods(t *testing.T) { fixtures := newTestFixtures() runtimeObj, err := createTestCronJobWithAnnotations(clients, fixtures.namespace, "1") - assert.NoError(t, err) - + require.NoError(t, err) cronJob := runtimeObj.(*batchv1.CronJob) - err = callbacks.CreateJobFromCronjob(clients, fixtures.namespace, cronJob) - assert.NoError(t, err) - jobList, err := clients.KubernetesClient.BatchV1().Jobs(fixtures.namespace).List(context.TODO(), metav1.ListOptions{}) - assert.NoError(t, err) + // Create running job and pod + runningJob, err := createTestJobForCronJob(clients, cronJob, "job1", true) + require.NoError(t, err) + runningPod, err := createTestPodForJob(clients, runningJob, "pod1", v1.PodRunning) + require.NoError(t, err) - ownerFound := false - for _, job := range jobList.Items { - if isControllerOwner("CronJob", cronJob.Name, job.OwnerReferences) { - ownerFound = true - break - } - } - assert.Truef(t, ownerFound, "Missing CronJob owner reference") + // Create succeeded job and pod + succeededJob, err := createTestJobForCronJob(clients, cronJob, "job2", false) + require.NoError(t, err) + succeededPod, err := createTestPodForJob(clients, succeededJob, "pod2", v1.PodSucceeded) + require.NoError(t, err) + + // Create a pod that is not associated with a Job + otherPod, err := createTestPod(clients, fixtures.namespace, "other", nil, v1.PodRunning) + require.NoError(t, err) + + // Run the restart + err = callbacks.RestartRunningCronjobPods(clients, fixtures.namespace, cronJob) + require.NoError(t, err) + // Verify running pod was deleted + _, err = clients.KubernetesClient.CoreV1().Pods(fixtures.namespace).Get(context.TODO(), runningPod.Name, metav1.GetOptions{}) + assert.Error(t, err, "Running pod should have been deleted") + + // Verify succeeded pod remains + _, err = clients.KubernetesClient.CoreV1().Pods(fixtures.namespace).Get(context.TODO(), succeededPod.Name, metav1.GetOptions{}) + assert.NoError(t, err, "Succeeded pod should not have been deleted") + + // Verify other pod remains + _, err = clients.KubernetesClient.CoreV1().Pods(fixtures.namespace).Get(context.TODO(), otherPod.Name, metav1.GetOptions{}) + assert.NoError(t, err, "Other pod should not have been deleted") + + // Clean up err = deleteTestCronJob(clients, fixtures.namespace, cronJob.Name) - assert.NoError(t, err) + require.NoError(t, err) } func TestReCreateJobFromJob(t *testing.T) { @@ -763,11 +814,56 @@ func deleteTestJob(clients kube.Clients, namespace, name string) error { return clients.KubernetesClient.BatchV1().Jobs(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) } -func isControllerOwner(kind, name string, ownerRefs []metav1.OwnerReference) bool { - for _, ownerRef := range ownerRefs { - if *ownerRef.Controller && ownerRef.Kind == kind && ownerRef.Name == name { - return true +// createTestJobForCronJob creates a minimal representation of a Job for the given CronJob +func createTestJobForCronJob(clients kube.Clients, cronJob *batchv1.CronJob, name string, active bool) (*batchv1.Job, error) { + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: cronJob.Name + "-" + name, + Namespace: cronJob.Namespace, + Labels: map[string]string{ + batchv1.ControllerUidLabel: string(cronJob.UID), + }, + }, + } + job, err := clients.KubernetesClient.BatchV1().Jobs(cronJob.Namespace).Create(context.TODO(), job, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + + // Update the CronJob status to include the new Job if it is active + if active { + cronJob.Status.Active = append(cronJob.Status.Active, v1.ObjectReference{ + Name: job.Name, + Namespace: job.Namespace, + }) + _, err = clients.KubernetesClient.BatchV1().CronJobs(cronJob.Namespace).Update(context.TODO(), cronJob, metav1.UpdateOptions{}) + if err != nil { + return nil, err } } - return false + return job, err +} + +// createTestPod creates a minimal representation of a pod +func createTestPod(clients kube.Clients, namespace, name string, metaLabels map[string]string, phase v1.PodPhase) (*v1.Pod, error) { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: patchtypes.UID(name), + Name: name, + Namespace: namespace, + Labels: metaLabels, + }, + Status: v1.PodStatus{ + Phase: phase, + }, + } + return clients.KubernetesClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) +} + +// createTestPodForJob creates a minimal representation of a pod for the given Job +func createTestPodForJob(clients kube.Clients, job *batchv1.Job, name string, phase v1.PodPhase) (*v1.Pod, error) { + metaLabels := map[string]string{ + batchv1.JobNameLabel: job.Name, + } + return createTestPod(clients, job.Namespace, job.Name+"-"+name, metaLabels, phase) } diff --git a/internal/pkg/handler/upgrade.go b/internal/pkg/handler/upgrade.go index 982dbfad5..3f74592a0 100644 --- a/internal/pkg/handler/upgrade.go +++ b/internal/pkg/handler/upgrade.go @@ -52,7 +52,7 @@ func GetDeploymentRollingUpgradeFuncs() callbacks.RollingUpgradeFuncs { } } -// GetDeploymentRollingUpgradeFuncs returns all callback funcs for a cronjob +// GetCronJobCreateJobFuncs returns all callback funcs for a cronjob func GetCronJobCreateJobFuncs() callbacks.RollingUpgradeFuncs { return callbacks.RollingUpgradeFuncs{ ItemFunc: callbacks.GetCronJobItem, @@ -61,7 +61,7 @@ func GetCronJobCreateJobFuncs() callbacks.RollingUpgradeFuncs { PodAnnotationsFunc: callbacks.GetCronJobPodAnnotations, ContainersFunc: callbacks.GetCronJobContainers, InitContainersFunc: callbacks.GetCronJobInitContainers, - UpdateFunc: callbacks.CreateJobFromCronjob, + UpdateFunc: callbacks.RestartRunningCronjobPods, PatchFunc: callbacks.PatchCronJob, PatchTemplatesFunc: func() callbacks.PatchTemplates { return callbacks.PatchTemplates{} }, VolumesFunc: callbacks.GetCronJobVolumes, @@ -70,7 +70,7 @@ func GetCronJobCreateJobFuncs() callbacks.RollingUpgradeFuncs { } } -// GetDeploymentRollingUpgradeFuncs returns all callback funcs for a cronjob +// GetJobCreateJobFuncs returns all callback funcs for a cronjob func GetJobCreateJobFuncs() callbacks.RollingUpgradeFuncs { return callbacks.RollingUpgradeFuncs{ ItemFunc: callbacks.GetJobItem, diff --git a/internal/pkg/testutil/kube.go b/internal/pkg/testutil/kube.go index a778eb15f..20868ddfa 100644 --- a/internal/pkg/testutil/kube.go +++ b/internal/pkg/testutil/kube.go @@ -41,7 +41,7 @@ var ( ConfigmapResourceType = "configMaps" // SecretResourceType is a resource type which controller watches for changes SecretResourceType = "secrets" - // SecretproviderclasspodstatusResourceType is a resource type which controller watches for changes + // SecretProviderClassPodStatusResourceType is a resource type which controller watches for changes SecretProviderClassPodStatusResourceType = "secretproviderclasspodstatuses" ) @@ -886,7 +886,7 @@ func CreateDeployment(client kubernetes.Interface, deploymentName string, namesp return deployment, err } -// CreateDeployment creates a deployment in given namespace and returns the Deployment +// CreateDeploymentWithAnnotations creates a deployment in given namespace and returns the Deployment func CreateDeploymentWithAnnotations(client kubernetes.Interface, deploymentName string, namespace string, additionalAnnotations map[string]string, volumeMount bool) (*appsv1.Deployment, error) { logrus.Infof("Creating Deployment") deploymentClient := client.AppsV1().Deployments(namespace) @@ -1088,7 +1088,7 @@ func DeleteCronJob(client kubernetes.Interface, namespace string, cronJobName st return cronJobError } -// Deleteob deletes a job in given namespace and returns the error if any +// DeleteJob deletes a job in given namespace and returns the error if any func DeleteJob(client kubernetes.Interface, namespace string, jobName string) error { logrus.Infof("Deleting Job %s", jobName) jobError := client.BatchV1().Jobs(namespace).Delete(context.TODO(), jobName, metav1.DeleteOptions{}) diff --git a/pkg/kube/client.go b/pkg/kube/client.go index 9582929c4..501b93af4 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -25,7 +25,7 @@ type Clients struct { var ( // IsOpenshift is true if environment is Openshift, it is false if environment is Kubernetes IsOpenshift = isOpenshift() - // IsCSIEnabled is true if environment has CSI provider installed, otherwise false + // IsCSIInstalled is true if environment has CSI provider installed, otherwise false IsCSIInstalled = isCSIInstalled() )