Skip to content

Commit 8cfb014

Browse files
committed
[INFRA-5852] Handle CronJob upgrade via active Job Pod deletion (#1)
Instead of immediately creating new Jobs we will instead delete pods belonging to active Jobs. This will cause the job controller to re-create the pods based on the defined policies. As such we will not violate the CronJob schedule, suspension and concurrency policies.
1 parent 81a7e7e commit 8cfb014

7 files changed

Lines changed: 153 additions & 51 deletions

File tree

deployments/kubernetes/chart/reloader/templates/clusterrole.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ rules:
9696
- list
9797
- get
9898
{{- end}}
99+
{{- if and .Values.reloader.ignoreCronJobs .Values.reloader.ignoreJobs }}{{- else }}
100+
- apiGroups:
101+
- ""
102+
resources:
103+
- pods
104+
verbs:
105+
- deletecollection
106+
{{- end }}
99107
{{- if .Values.reloader.enableHA }}
100108
- apiGroups:
101109
- "coordination.k8s.io"

deployments/kubernetes/manifests/clusterrole.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ rules:
5151
- delete
5252
- list
5353
- get
54+
- apiGroups:
55+
- ""
56+
resources:
57+
- pods
58+
verbs:
59+
- deletecollection
5460
- apiGroups:
5561
- ""
5662
resources:

internal/pkg/callbacks/rolling_upgrade.go

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ import (
1616
"k8s.io/apimachinery/pkg/runtime"
1717
patchtypes "k8s.io/apimachinery/pkg/types"
1818

19-
"maps"
20-
2119
argorolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
2220
)
2321

@@ -42,7 +40,7 @@ type UpdateFunc func(kube.Clients, string, runtime.Object) error
4240
// PatchFunc performs the resource patch
4341
type PatchFunc func(kube.Clients, string, runtime.Object, patchtypes.PatchType, []byte) error
4442

45-
// PatchTemplateFunc is a generic func to return strategic merge JSON patch template
43+
// PatchTemplatesFunc is a generic func to return strategic merge JSON patch template
4644
type PatchTemplatesFunc func() PatchTemplates
4745

4846
// AnnotationsFunc is a generic func to return annotations
@@ -442,26 +440,20 @@ func PatchDeployment(clients kube.Clients, namespace string, resource runtime.Ob
442440
return err
443441
}
444442

445-
// CreateJobFromCronjob performs rolling upgrade on cronjob
446-
func CreateJobFromCronjob(clients kube.Clients, namespace string, resource runtime.Object) error {
443+
// RestartRunningCronjobPods restarts all pods currently running cronjob jobs
444+
func RestartRunningCronjobPods(clients kube.Clients, namespace string, resource runtime.Object) error {
447445
cronJob := resource.(*batchv1.CronJob)
448-
449-
annotations := make(map[string]string)
450-
annotations["cronjob.kubernetes.io/instantiate"] = "manual"
451-
maps.Copy(annotations, cronJob.Spec.JobTemplate.Annotations)
452-
453-
job := &batchv1.Job{
454-
ObjectMeta: meta_v1.ObjectMeta{
455-
GenerateName: cronJob.Name + "-",
456-
Namespace: cronJob.Namespace,
457-
Annotations: annotations,
458-
Labels: cronJob.Spec.JobTemplate.Labels,
459-
OwnerReferences: []meta_v1.OwnerReference{*meta_v1.NewControllerRef(cronJob, batchv1.SchemeGroupVersion.WithKind("CronJob"))},
460-
},
461-
Spec: cronJob.Spec.JobTemplate.Spec,
462-
}
463-
_, err := clients.KubernetesClient.BatchV1().Jobs(namespace).Create(context.TODO(), job, meta_v1.CreateOptions{FieldManager: "Reloader"})
464-
return err
446+
for _, job := range cronJob.Status.Active {
447+
logrus.Debugf("Deleting running pods for active Job %s/%s", job.Namespace, job.Name)
448+
listOpts := meta_v1.ListOptions{
449+
LabelSelector: fmt.Sprintf("%s=%s", batchv1.JobNameLabel, job.Name),
450+
FieldSelector: "status.phase=Running",
451+
}
452+
if err := clients.KubernetesClient.CoreV1().Pods(namespace).DeleteCollection(context.TODO(), meta_v1.DeleteOptions{}, listOpts); err != nil {
453+
return err
454+
}
455+
}
456+
return nil
465457
}
466458

467459
func PatchCronJob(clients kube.Clients, namespace string, resource runtime.Object, patchType patchtypes.PatchType, bytes []byte) error {

internal/pkg/callbacks/rolling_upgrade_test.go

Lines changed: 118 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,20 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/sirupsen/logrus"
1011
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
1113
appsv1 "k8s.io/api/apps/v1"
1214
batchv1 "k8s.io/api/batch/v1"
1315
v1 "k8s.io/api/core/v1"
1416
"k8s.io/apimachinery/pkg/api/meta"
1517
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"k8s.io/apimachinery/pkg/fields"
19+
"k8s.io/apimachinery/pkg/labels"
1620
"k8s.io/apimachinery/pkg/runtime"
1721
watch "k8s.io/apimachinery/pkg/watch"
1822
"k8s.io/client-go/kubernetes/fake"
23+
k8stesting "k8s.io/client-go/testing"
1924

2025
argorolloutv1alpha1 "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
2126
fakeargoclientset "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/fake"
@@ -48,8 +53,36 @@ func newTestFixtures() testFixtures {
4853
}
4954

5055
func setupTestClients() kube.Clients {
56+
fakeClient := fake.NewClientset()
57+
58+
// Add a reactor to handle Pod DeleteCollection with support for filtering on Phase
59+
fakeClient.PrependReactor("delete-collection", "pods", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
60+
deleteAction, _ := action.(k8stesting.DeleteCollectionActionImpl)
61+
labelSelector, fieldSelector, _ := k8stesting.ExtractFromListOptions(deleteAction.ListOptions)
62+
gvk := v1.SchemeGroupVersion.WithKind("Pod")
63+
obj, _ := fakeClient.Tracker().List(deleteAction.GetResource(), gvk, deleteAction.GetNamespace(), deleteAction.ListOptions)
64+
podList, _ := obj.(*v1.PodList)
65+
for _, pod := range podList.Items {
66+
podFields := fields.Set(map[string]string{
67+
"status.phase": string(pod.Status.Phase),
68+
})
69+
if fieldSelector != nil && !fieldSelector.Matches(podFields) {
70+
continue
71+
}
72+
if !labelSelector.Matches(labels.Set(pod.Labels)) {
73+
continue
74+
}
75+
logrus.Infof("Deleting pod %s via DeleteCollection", pod.Name)
76+
err = fakeClient.Tracker().Delete(deleteAction.GetResource(), deleteAction.GetNamespace(), pod.Name)
77+
if err != nil {
78+
logrus.Errorf("Error deleting pod %s: %v", pod.Name, err)
79+
}
80+
}
81+
return true, nil, nil
82+
})
83+
5184
return kube.Clients{
52-
KubernetesClient: fake.NewClientset(),
85+
KubernetesClient: fakeClient,
5386
ArgoRolloutClient: fakeargoclientset.NewSimpleClientset(),
5487
}
5588
}
@@ -412,30 +445,48 @@ func TestPatchResources(t *testing.T) {
412445
}
413446
}
414447

415-
func TestCreateJobFromCronjob(t *testing.T) {
448+
func TestRestartRunningCronjobPods(t *testing.T) {
416449
fixtures := newTestFixtures()
417450

418451
runtimeObj, err := createTestCronJobWithAnnotations(clients, fixtures.namespace, "1")
419-
assert.NoError(t, err)
420-
452+
require.NoError(t, err)
421453
cronJob := runtimeObj.(*batchv1.CronJob)
422-
err = callbacks.CreateJobFromCronjob(clients, fixtures.namespace, cronJob)
423-
assert.NoError(t, err)
424454

425-
jobList, err := clients.KubernetesClient.BatchV1().Jobs(fixtures.namespace).List(context.TODO(), metav1.ListOptions{})
426-
assert.NoError(t, err)
455+
// Create running job and pod
456+
runningJob, err := createTestJobForCronJob(clients, cronJob, "job1", true)
457+
require.NoError(t, err)
458+
runningPod, err := createTestPodForJob(clients, runningJob, "pod1", v1.PodRunning)
459+
require.NoError(t, err)
427460

428-
ownerFound := false
429-
for _, job := range jobList.Items {
430-
if isControllerOwner("CronJob", cronJob.Name, job.OwnerReferences) {
431-
ownerFound = true
432-
break
433-
}
434-
}
435-
assert.Truef(t, ownerFound, "Missing CronJob owner reference")
461+
// Create succeeded job and pod
462+
succeededJob, err := createTestJobForCronJob(clients, cronJob, "job2", false)
463+
require.NoError(t, err)
464+
succeededPod, err := createTestPodForJob(clients, succeededJob, "pod2", v1.PodSucceeded)
465+
require.NoError(t, err)
466+
467+
// Create a pod that is not associated with a Job
468+
otherPod, err := createTestPod(clients, fixtures.namespace, "other", nil, v1.PodRunning)
469+
require.NoError(t, err)
470+
471+
// Run the restart
472+
err = callbacks.RestartRunningCronjobPods(clients, fixtures.namespace, cronJob)
473+
require.NoError(t, err)
436474

475+
// Verify running pod was deleted
476+
_, err = clients.KubernetesClient.CoreV1().Pods(fixtures.namespace).Get(context.TODO(), runningPod.Name, metav1.GetOptions{})
477+
assert.Error(t, err, "Running pod should have been deleted")
478+
479+
// Verify succeeded pod remains
480+
_, err = clients.KubernetesClient.CoreV1().Pods(fixtures.namespace).Get(context.TODO(), succeededPod.Name, metav1.GetOptions{})
481+
assert.NoError(t, err, "Succeeded pod should not have been deleted")
482+
483+
// Verify other pod remains
484+
_, err = clients.KubernetesClient.CoreV1().Pods(fixtures.namespace).Get(context.TODO(), otherPod.Name, metav1.GetOptions{})
485+
assert.NoError(t, err, "Other pod should not have been deleted")
486+
487+
// Clean up
437488
err = deleteTestCronJob(clients, fixtures.namespace, cronJob.Name)
438-
assert.NoError(t, err)
489+
require.NoError(t, err)
439490
}
440491

441492
func TestReCreateJobFromJob(t *testing.T) {
@@ -763,11 +814,56 @@ func deleteTestJob(clients kube.Clients, namespace, name string) error {
763814
return clients.KubernetesClient.BatchV1().Jobs(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
764815
}
765816

766-
func isControllerOwner(kind, name string, ownerRefs []metav1.OwnerReference) bool {
767-
for _, ownerRef := range ownerRefs {
768-
if *ownerRef.Controller && ownerRef.Kind == kind && ownerRef.Name == name {
769-
return true
817+
// createTestJobForCronJob creates a minimal representation of a Job for the given CronJob
818+
func createTestJobForCronJob(clients kube.Clients, cronJob *batchv1.CronJob, name string, active bool) (*batchv1.Job, error) {
819+
job := &batchv1.Job{
820+
ObjectMeta: metav1.ObjectMeta{
821+
Name: cronJob.Name + "-" + name,
822+
Namespace: cronJob.Namespace,
823+
Labels: map[string]string{
824+
batchv1.ControllerUidLabel: string(cronJob.UID),
825+
},
826+
},
827+
}
828+
job, err := clients.KubernetesClient.BatchV1().Jobs(cronJob.Namespace).Create(context.TODO(), job, metav1.CreateOptions{})
829+
if err != nil {
830+
return nil, err
831+
}
832+
833+
// Update the CronJob status to include the new Job if it is active
834+
if active {
835+
cronJob.Status.Active = append(cronJob.Status.Active, v1.ObjectReference{
836+
Name: job.Name,
837+
Namespace: job.Namespace,
838+
})
839+
_, err = clients.KubernetesClient.BatchV1().CronJobs(cronJob.Namespace).Update(context.TODO(), cronJob, metav1.UpdateOptions{})
840+
if err != nil {
841+
return nil, err
770842
}
771843
}
772-
return false
844+
return job, err
845+
}
846+
847+
// createTestPod creates a minimal representation of a pod
848+
func createTestPod(clients kube.Clients, namespace, name string, metaLabels map[string]string, phase v1.PodPhase) (*v1.Pod, error) {
849+
pod := &v1.Pod{
850+
ObjectMeta: metav1.ObjectMeta{
851+
UID: patchtypes.UID(name),
852+
Name: name,
853+
Namespace: namespace,
854+
Labels: metaLabels,
855+
},
856+
Status: v1.PodStatus{
857+
Phase: phase,
858+
},
859+
}
860+
return clients.KubernetesClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
861+
}
862+
863+
// createTestPodForJob creates a minimal representation of a pod for the given Job
864+
func createTestPodForJob(clients kube.Clients, job *batchv1.Job, name string, phase v1.PodPhase) (*v1.Pod, error) {
865+
metaLabels := map[string]string{
866+
batchv1.JobNameLabel: job.Name,
867+
}
868+
return createTestPod(clients, job.Namespace, job.Name+"-"+name, metaLabels, phase)
773869
}

internal/pkg/handler/upgrade.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func GetDeploymentRollingUpgradeFuncs() callbacks.RollingUpgradeFuncs {
5252
}
5353
}
5454

55-
// GetDeploymentRollingUpgradeFuncs returns all callback funcs for a cronjob
55+
// GetCronJobCreateJobFuncs returns all callback funcs for a cronjob
5656
func GetCronJobCreateJobFuncs() callbacks.RollingUpgradeFuncs {
5757
return callbacks.RollingUpgradeFuncs{
5858
ItemFunc: callbacks.GetCronJobItem,
@@ -61,7 +61,7 @@ func GetCronJobCreateJobFuncs() callbacks.RollingUpgradeFuncs {
6161
PodAnnotationsFunc: callbacks.GetCronJobPodAnnotations,
6262
ContainersFunc: callbacks.GetCronJobContainers,
6363
InitContainersFunc: callbacks.GetCronJobInitContainers,
64-
UpdateFunc: callbacks.CreateJobFromCronjob,
64+
UpdateFunc: callbacks.RestartRunningCronjobPods,
6565
PatchFunc: callbacks.PatchCronJob,
6666
PatchTemplatesFunc: func() callbacks.PatchTemplates { return callbacks.PatchTemplates{} },
6767
VolumesFunc: callbacks.GetCronJobVolumes,
@@ -70,7 +70,7 @@ func GetCronJobCreateJobFuncs() callbacks.RollingUpgradeFuncs {
7070
}
7171
}
7272

73-
// GetDeploymentRollingUpgradeFuncs returns all callback funcs for a cronjob
73+
// GetJobCreateJobFuncs returns all callback funcs for a cronjob
7474
func GetJobCreateJobFuncs() callbacks.RollingUpgradeFuncs {
7575
return callbacks.RollingUpgradeFuncs{
7676
ItemFunc: callbacks.GetJobItem,

internal/pkg/testutil/kube.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ var (
4141
ConfigmapResourceType = "configMaps"
4242
// SecretResourceType is a resource type which controller watches for changes
4343
SecretResourceType = "secrets"
44-
// SecretproviderclasspodstatusResourceType is a resource type which controller watches for changes
44+
// SecretProviderClassPodStatusResourceType is a resource type which controller watches for changes
4545
SecretProviderClassPodStatusResourceType = "secretproviderclasspodstatuses"
4646
)
4747

@@ -886,7 +886,7 @@ func CreateDeployment(client kubernetes.Interface, deploymentName string, namesp
886886
return deployment, err
887887
}
888888

889-
// CreateDeployment creates a deployment in given namespace and returns the Deployment
889+
// CreateDeploymentWithAnnotations creates a deployment in given namespace and returns the Deployment
890890
func CreateDeploymentWithAnnotations(client kubernetes.Interface, deploymentName string, namespace string, additionalAnnotations map[string]string, volumeMount bool) (*appsv1.Deployment, error) {
891891
logrus.Infof("Creating Deployment")
892892
deploymentClient := client.AppsV1().Deployments(namespace)
@@ -1088,7 +1088,7 @@ func DeleteCronJob(client kubernetes.Interface, namespace string, cronJobName st
10881088
return cronJobError
10891089
}
10901090

1091-
// Deleteob deletes a job in given namespace and returns the error if any
1091+
// DeleteJob deletes a job in given namespace and returns the error if any
10921092
func DeleteJob(client kubernetes.Interface, namespace string, jobName string) error {
10931093
logrus.Infof("Deleting Job %s", jobName)
10941094
jobError := client.BatchV1().Jobs(namespace).Delete(context.TODO(), jobName, metav1.DeleteOptions{})

pkg/kube/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type Clients struct {
2525
var (
2626
// IsOpenshift is true if environment is Openshift, it is false if environment is Kubernetes
2727
IsOpenshift = isOpenshift()
28-
// IsCSIEnabled is true if environment has CSI provider installed, otherwise false
28+
// IsCSIInstalled is true if environment has CSI provider installed, otherwise false
2929
IsCSIInstalled = isCSIInstalled()
3030
)
3131

0 commit comments

Comments
 (0)