Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions pkg/k8s/pod/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,10 @@ func (s PodRecreationState) String() string {
}

// WaitForPodRecreationAndCompletion waits for a pod to go through the complete lifecycle:
// DELETED -> ADDED -> RUNNING -> COMPLETED
func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespace, podName string) error {
// DELETED -> ADDED -> RUNNING -> COMPLETED.
// containerName selects which container's state drives the lifecycle; if empty,
// the pod's first container is used.
func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespace, podName, containerName string) error {
c.log.Debugf("waiting for pod %s to complete recreation and execution lifecycle", podName)
defer c.log.Debugf("watch for pod %s in namespace %s done", podName, namespace)

Expand All @@ -229,7 +231,7 @@ func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespac
for {
select {
case event := <-watcher.ResultChan():
newState, err := c.processEventInState(event, currentState, podName)
newState, err := c.processEventInState(event, currentState, podName, containerName)
if err != nil {
return err
}
Expand All @@ -251,14 +253,17 @@ func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespac
}

// processEventInState handles state transitions based on the received event
func (c *Client) processEventInState(event watch.Event, currentState PodRecreationState, podName string) (PodRecreationState, error) {
func (c *Client) processEventInState(event watch.Event, currentState PodRecreationState, podName, containerName string) (PodRecreationState, error) {
pod, ok := event.Object.(*v1.Pod)
if !ok {
c.log.Debugf("watch event is not a pod, skipping")
return currentState, nil
}

containerName := pod.Spec.Containers[0].Name
// Fall back to the first container when no specific one was requested.
if containerName == "" && len(pod.Spec.Containers) > 0 {
containerName = pod.Spec.Containers[0].Name
}

switch currentState {
case WaitingForDeletion:
Expand Down
69 changes: 61 additions & 8 deletions pkg/nuker/nuker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ethersphere/beekeeper/pkg/node"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/util/retry"
)
Expand Down Expand Up @@ -203,8 +204,8 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str
}

// 1. Save the original state by creating a deep copy before any modifications.
if len(ss.Spec.Template.Spec.Containers) == 0 {
return errors.New("stateful set has no containers")
if beeContainerIndex(ss) < 0 {
return fmt.Errorf("stateful set %s has no %q container", ss.Name, beeContainerName)
}
originalSS := ss.DeepCopy()

Expand All @@ -220,13 +221,20 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str
return fmt.Errorf("failed to get latest stateful set %s: %w", ss.Name, err)
}

beeIdx := beeContainerIndex(latestSS)
if beeIdx < 0 {
return fmt.Errorf("stateful set %s has no %q container", latestSS.Name, beeContainerName)
}

// Restore the original configuration from our deep copy
latestSS.Spec.UpdateStrategy = originalSS.Spec.UpdateStrategy
latestSS.Spec.Replicas = originalSS.Spec.Replicas
latestSS.Spec.Template.Spec.Containers[0].Command = restartArgs
latestSS.Spec.Template.Spec.Containers[0].ReadinessProbe = originalSS.Spec.Template.Spec.Containers[0].ReadinessProbe
latestSS.Spec.Template.Spec.Containers[beeIdx].Command = restartArgs
// Restore readiness probes on every container, including sidecars,
// matching them by name.
restoreReadinessProbes(latestSS, originalSS)
if c.image != "" {
latestSS.Spec.Template.Spec.Containers[0].Image = c.image
latestSS.Spec.Template.Spec.Containers[beeIdx].Image = c.image
}

return c.k8sClient.StatefulSet.Update(ctx, namespace, latestSS)
Expand Down Expand Up @@ -254,20 +262,34 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str
return fmt.Errorf("failed to get latest stateful set %s: %w", ss.Name, err)
}

beeIdx := beeContainerIndex(latestSS)
if beeIdx < 0 {
return fmt.Errorf("stateful set %s has no %q container", latestSS.Name, beeContainerName)
}

// Apply the changes for the nuke task.
latestSS.Spec.UpdateStrategy.Type = v1.OnDeleteStatefulSetStrategyType
latestSS.Spec.UpdateStrategy.RollingUpdate = nil
latestSS.Spec.Template.Spec.Containers[0].Command = updateArgs
latestSS.Spec.Template.Spec.Containers[0].ReadinessProbe = nil
latestSS.Spec.Template.Spec.Containers[beeIdx].Command = updateArgs
// Remove readiness probes from every container, including sidecars, so
// the pod can reach Ready while bee runs the short-lived nuke.
for i := range latestSS.Spec.Template.Spec.Containers {
latestSS.Spec.Template.Spec.Containers[i].ReadinessProbe = nil
}

return c.k8sClient.StatefulSet.Update(ctx, namespace, latestSS)
}); err != nil {
return fmt.Errorf("failed to apply update spec to stateful set %s: %w", ss.Name, err)
}

// 3. Sequentially delete each pod and wait for it to be recreated and complete the task.
// Watch the bee container by name so completion is detected on the right
// container even when sidecars are present.
c.log.Debugf("deleting pods in stateful set %s to trigger update task", ss.Name)
if err := c.recreatePodsAndWait(ctx, namespace, ss, c.k8sClient.Pods.WaitForPodRecreationAndCompletion); err != nil {
waitForCompletion := func(ctx context.Context, namespace, podName string) error {
return c.k8sClient.Pods.WaitForPodRecreationAndCompletion(ctx, namespace, podName, beeContainerName)
}
if err := c.recreatePodsAndWait(ctx, namespace, ss, waitForCompletion); err != nil {
return fmt.Errorf("failed during pod update task for %s: %w", ss.Name, err)
}
c.log.Debugf("all pods for %s completed the update task", ss.Name)
Expand Down Expand Up @@ -323,6 +345,37 @@ func (c *Client) recreatePodsAndWait(ctx context.Context, namespace string, ss *
return g.Wait()
}

// beeContainerName is the name of the Bee container in each pod's template.
const beeContainerName = "bee"

// beeContainerIndex returns the index of the Bee container in the StatefulSet's
// pod template, or -1 if it is not present.
func beeContainerIndex(ss *v1.StatefulSet) int {
for i, c := range ss.Spec.Template.Spec.Containers {
if c.Name == beeContainerName {
return i
}
}
return -1
}

// restoreReadinessProbes copies each container's readiness probe from the
// original StatefulSet spec back onto the target spec, matching containers by
// name so the restore is independent of container ordering.
func restoreReadinessProbes(target, original *v1.StatefulSet) {
originalProbes := make(map[string]*corev1.Probe, len(original.Spec.Template.Spec.Containers))
for _, c := range original.Spec.Template.Spec.Containers {
originalProbes[c.Name] = c.ReadinessProbe
}

for i := range target.Spec.Template.Spec.Containers {
c := &target.Spec.Template.Spec.Containers[i]
if probe, ok := originalProbes[c.Name]; ok {
c.ReadinessProbe = probe
}
}
}

// getPodNames generates a list of pod names for a given StatefulSet based on its replicas.
// Each pod name represents a Bee node.
func getPodNames(ss *v1.StatefulSet) []string {
Expand Down
Loading