From fcaf1912b86e872678bc03088765f967c68fa45e Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Thu, 11 Jun 2026 09:15:03 +0200 Subject: [PATCH] fix(nuker): handle sidecar containers when nuking --- pkg/k8s/pod/client.go | 15 ++++++---- pkg/nuker/nuker.go | 69 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 71 insertions(+), 13 deletions(-) diff --git a/pkg/k8s/pod/client.go b/pkg/k8s/pod/client.go index ae07294d9..1115c1d63 100644 --- a/pkg/k8s/pod/client.go +++ b/pkg/k8s/pod/client.go @@ -206,8 +206,10 @@ func (s PodRecreationState) String() string { } // WaitForPodRecreationAndCompletion waits for a pod to go through the complete lifecycle: -// DELETED -> ADDED -> RUNNING -> COMPLETED -func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespace, podName string) error { +// DELETED -> ADDED -> RUNNING -> COMPLETED. +// containerName selects which container's state drives the lifecycle; if empty, +// the pod's first container is used. +func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespace, podName, containerName string) error { c.log.Debugf("waiting for pod %s to complete recreation and execution lifecycle", podName) defer c.log.Debugf("watch for pod %s in namespace %s done", podName, namespace) @@ -229,7 +231,7 @@ func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespac for { select { case event := <-watcher.ResultChan(): - newState, err := c.processEventInState(event, currentState, podName) + newState, err := c.processEventInState(event, currentState, podName, containerName) if err != nil { return err } @@ -251,14 +253,17 @@ func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespac } // processEventInState handles state transitions based on the received event -func (c *Client) processEventInState(event watch.Event, currentState PodRecreationState, podName string) (PodRecreationState, error) { +func (c *Client) processEventInState(event watch.Event, currentState PodRecreationState, podName, containerName string) (PodRecreationState, error) { pod, ok := event.Object.(*v1.Pod) if !ok { c.log.Debugf("watch event is not a pod, skipping") return currentState, nil } - containerName := pod.Spec.Containers[0].Name + // Fall back to the first container when no specific one was requested. + if containerName == "" && len(pod.Spec.Containers) > 0 { + containerName = pod.Spec.Containers[0].Name + } switch currentState { case WaitingForDeletion: diff --git a/pkg/nuker/nuker.go b/pkg/nuker/nuker.go index 562c758c8..4d531cbed 100644 --- a/pkg/nuker/nuker.go +++ b/pkg/nuker/nuker.go @@ -11,6 +11,7 @@ import ( "github.com/ethersphere/beekeeper/pkg/node" "golang.org/x/sync/errgroup" v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/util/retry" ) @@ -203,8 +204,8 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str } // 1. Save the original state by creating a deep copy before any modifications. - if len(ss.Spec.Template.Spec.Containers) == 0 { - return errors.New("stateful set has no containers") + if beeContainerIndex(ss) < 0 { + return fmt.Errorf("stateful set %s has no %q container", ss.Name, beeContainerName) } originalSS := ss.DeepCopy() @@ -220,13 +221,20 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str return fmt.Errorf("failed to get latest stateful set %s: %w", ss.Name, err) } + beeIdx := beeContainerIndex(latestSS) + if beeIdx < 0 { + return fmt.Errorf("stateful set %s has no %q container", latestSS.Name, beeContainerName) + } + // Restore the original configuration from our deep copy latestSS.Spec.UpdateStrategy = originalSS.Spec.UpdateStrategy latestSS.Spec.Replicas = originalSS.Spec.Replicas - latestSS.Spec.Template.Spec.Containers[0].Command = restartArgs - latestSS.Spec.Template.Spec.Containers[0].ReadinessProbe = originalSS.Spec.Template.Spec.Containers[0].ReadinessProbe + latestSS.Spec.Template.Spec.Containers[beeIdx].Command = restartArgs + // Restore readiness probes on every container, including sidecars, + // matching them by name. + restoreReadinessProbes(latestSS, originalSS) if c.image != "" { - latestSS.Spec.Template.Spec.Containers[0].Image = c.image + latestSS.Spec.Template.Spec.Containers[beeIdx].Image = c.image } return c.k8sClient.StatefulSet.Update(ctx, namespace, latestSS) @@ -254,11 +262,20 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str return fmt.Errorf("failed to get latest stateful set %s: %w", ss.Name, err) } + beeIdx := beeContainerIndex(latestSS) + if beeIdx < 0 { + return fmt.Errorf("stateful set %s has no %q container", latestSS.Name, beeContainerName) + } + // Apply the changes for the nuke task. latestSS.Spec.UpdateStrategy.Type = v1.OnDeleteStatefulSetStrategyType latestSS.Spec.UpdateStrategy.RollingUpdate = nil - latestSS.Spec.Template.Spec.Containers[0].Command = updateArgs - latestSS.Spec.Template.Spec.Containers[0].ReadinessProbe = nil + latestSS.Spec.Template.Spec.Containers[beeIdx].Command = updateArgs + // Remove readiness probes from every container, including sidecars, so + // the pod can reach Ready while bee runs the short-lived nuke. + for i := range latestSS.Spec.Template.Spec.Containers { + latestSS.Spec.Template.Spec.Containers[i].ReadinessProbe = nil + } return c.k8sClient.StatefulSet.Update(ctx, namespace, latestSS) }); err != nil { @@ -266,8 +283,13 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str } // 3. Sequentially delete each pod and wait for it to be recreated and complete the task. + // Watch the bee container by name so completion is detected on the right + // container even when sidecars are present. c.log.Debugf("deleting pods in stateful set %s to trigger update task", ss.Name) - if err := c.recreatePodsAndWait(ctx, namespace, ss, c.k8sClient.Pods.WaitForPodRecreationAndCompletion); err != nil { + waitForCompletion := func(ctx context.Context, namespace, podName string) error { + return c.k8sClient.Pods.WaitForPodRecreationAndCompletion(ctx, namespace, podName, beeContainerName) + } + if err := c.recreatePodsAndWait(ctx, namespace, ss, waitForCompletion); err != nil { return fmt.Errorf("failed during pod update task for %s: %w", ss.Name, err) } c.log.Debugf("all pods for %s completed the update task", ss.Name) @@ -323,6 +345,37 @@ func (c *Client) recreatePodsAndWait(ctx context.Context, namespace string, ss * return g.Wait() } +// beeContainerName is the name of the Bee container in each pod's template. +const beeContainerName = "bee" + +// beeContainerIndex returns the index of the Bee container in the StatefulSet's +// pod template, or -1 if it is not present. +func beeContainerIndex(ss *v1.StatefulSet) int { + for i, c := range ss.Spec.Template.Spec.Containers { + if c.Name == beeContainerName { + return i + } + } + return -1 +} + +// restoreReadinessProbes copies each container's readiness probe from the +// original StatefulSet spec back onto the target spec, matching containers by +// name so the restore is independent of container ordering. +func restoreReadinessProbes(target, original *v1.StatefulSet) { + originalProbes := make(map[string]*corev1.Probe, len(original.Spec.Template.Spec.Containers)) + for _, c := range original.Spec.Template.Spec.Containers { + originalProbes[c.Name] = c.ReadinessProbe + } + + for i := range target.Spec.Template.Spec.Containers { + c := &target.Spec.Template.Spec.Containers[i] + if probe, ok := originalProbes[c.Name]; ok { + c.ReadinessProbe = probe + } + } +} + // getPodNames generates a list of pod names for a given StatefulSet based on its replicas. // Each pod name represents a Bee node. func getPodNames(ss *v1.StatefulSet) []string {