diff --git a/pkg/controller/chi/controller.go b/pkg/controller/chi/controller.go index 13e481ab2..484fd37cc 100644 --- a/pkg/controller/chi/controller.go +++ b/pkg/controller/chi/controller.go @@ -959,6 +959,15 @@ func (c *Controller) handleObject(obj interface{}) { } func ShouldEnqueue(cr *api.ClickHouseInstallation) bool { + if cr == nil { + return false + } + + if cr.Spec.Suspend.Value() { + log.V(2).M(cr).Info("skip enqueue, CHI is suspended") + return false + } + ns := cr.GetNamespace() if !chop.Config().IsNamespaceWatched(ns) { log.V(2).M(cr).Info("skip enqueue, namespace '%s' is not watched or is in deny list", ns) diff --git a/pkg/controller/chi/worker-deleter.go b/pkg/controller/chi/worker-deleter.go index 6ff9e1e6c..736e77c75 100644 --- a/pkg/controller/chi/worker-deleter.go +++ b/pkg/controller/chi/worker-deleter.go @@ -382,7 +382,10 @@ func (w *worker) hasHostVolumesToRetain(ctx context.Context, host *api.Host) (ha // Check whether among all PVCs host has reclaim policy "retain" specified storage.NewStoragePVC(w.c.kube.Storage()).WalkDiscoveredPVCs(ctx, host, func(pvc *core.PersistentVolumeClaim) { if chiLabeler.New(nil).GetReclaimPolicy(pvc.GetObjectMeta()) == api.PVCReclaimPolicyRetain { - w.a.V(1).F().Info("PVC: %s/%s blocks drop replica. Reclaim policy: %s", api.PVCReclaimPolicyRetain.String()) + w.a.V(1).F().Info( + "PVC: %s/%s blocks drop replica. Reclaim policy: %s", + api.PVCReclaimPolicyRetain.String(), + ) has = true } }) @@ -451,7 +454,7 @@ func (a dropReplicaOptionsArr) First() *dropReplicaOptions { // dropZKReplica drops replica's info from Zookeeper func (w *worker) dropZKReplica(ctx context.Context, hostToDrop *api.Host, opts *dropReplicaOptions) error { if hostToDrop == nil { - w.a.V(1).F().Error("FAILED to drop replica. Need to have host to drop. hostToDrop: %s", hostToDrop.GetName()) + w.a.V(1).F().Error("FAILED to drop replica. Need to have host to drop. hostToDrop is nil") return nil } @@ -462,7 +465,7 @@ func (w *worker) dropZKReplica(ctx context.Context, hostToDrop *api.Host, opts * } if hostToRunOn == nil { - w.a.V(1).F().Error("FAILED to drop replica. hostToRunOn: %s, hostToDrop: %s", hostToRunOn.GetName(), hostToDrop.GetName()) + w.a.V(1).F().Error("FAILED to drop replica. hostToRunOn is nil, hostToDrop: %s", hostToDrop.GetName()) return nil } diff --git a/pkg/controller/chi/worker-reconciler-chi.go b/pkg/controller/chi/worker-reconciler-chi.go index 174ad5df2..631cc1a94 100644 --- a/pkg/controller/chi/worker-reconciler-chi.go +++ b/pkg/controller/chi/worker-reconciler-chi.go @@ -49,9 +49,6 @@ func (w *worker) reconcileCR(ctx context.Context, old, new *api.ClickHouseInstal switch { case w.isAfterFinalizerInstalled(old, new): w.a.M(new).F().Info("isAfterFinalizerInstalled - continue reconcile-1") - case w.isGenerationTheSame(old, new): - log.V(2).M(new).F().Info("isGenerationTheSame() - nothing to do here, exit") - return nil } w.a.M(new).S().P() @@ -62,6 +59,8 @@ func (w *worker) reconcileCR(ctx context.Context, old, new *api.ClickHouseInstal startTime := time.Now() new = w.buildCR(ctx, new) + generationTheSame := w.isGenerationTheSame(old, new) + hasUnhealthyHosts := w.hasUnhealthyHosts(ctx, new) switch { case new.Spec.Suspend.Value(): @@ -87,6 +86,12 @@ func (w *worker) reconcileCR(ctx context.Context, old, new *api.ClickHouseInstal metrics.CRReconcilesCompleted(ctx, new) } return nil + case generationTheSame && !hasUnhealthyHosts: + w.a.M(new).F().Info("isGenerationTheSame() and all hosts are healthy - nothing to do, exit") + metrics.CRReconcilesCompleted(ctx, new) + return nil + case generationTheSame && hasUnhealthyHosts: + w.a.M(new).F().Warning("Generation is unchanged, but unhealthy hosts detected - forcing steady-state recovery reconcile") case new.HasReconcileWork(): w.a.M(new).F().Info("CR has reconcile work - continue reconcile") case w.isAfterFinalizerInstalled(new.GetAncestorT(), new): @@ -401,8 +406,12 @@ func (w *worker) reconcileHostStatefulSet(ctx context.Context, host *api.Host, o // Start with force-restart host if w.shouldForceRestartHost(ctx, host) { - w.a.V(1).M(host).F().Info("Reconcile host STS force restart: %s", host.GetName()) - _ = w.hostForceRestart(ctx, host, opts) + if w.isHostHealthyForReconcile(ctx, host) && !w.isShardSafeToDisruptHost(ctx, host) { + w.a.V(1).M(host).F().Warning("Skip force restart for host due to shard safety guard (no healthy peer): %s", host.GetName()) + } else { + w.a.V(1).M(host).F().Info("Reconcile host STS force restart: %s", host.GetName()) + _ = w.hostForceRestart(ctx, host, opts) + } } w.stsReconciler.PrepareHostStatefulSetWithStatus(ctx, host, host.IsStopped()) @@ -670,12 +679,66 @@ func (w *worker) reconcileClusterShardsAndHosts(ctx context.Context, cluster *ap } func (w *worker) reconcileShardWithHosts(ctx context.Context, shard api.IShard) error { - if err := w.reconcileShard(ctx, shard); err != nil { + if err := w.reconcileShardWithHook(ctx, shard); err != nil { return err } - return shard.WalkHostsAbortOnError(func(host *api.Host) error { - return w.reconcileHost(ctx, host) + + recoveryHosts := make([]*api.Host, 0) + rolloutHosts := make([]*api.Host, 0) + shard.WalkHosts(func(host *api.Host) error { + if w.isHostHealthyForReconcile(ctx, host) { + rolloutHosts = append(rolloutHosts, host) + } else { + recoveryHosts = append(recoveryHosts, host) + } + return nil }) + + recoveredHosts := 0 + reconciledHosts := 0 + deferredHosts := 0 + + for _, host := range recoveryHosts { + w.a.V(1).M(host).F().Info("Recovery-first pass: reconciling unhealthy host before rollout") + if err := w.reconcileHostWithHook(ctx, host); err != nil { + return err + } + recoveredHosts++ + } + + for _, host := range rolloutHosts { + if w.hostMayRequireDisruption(ctx, host) && !w.isShardSafeToDisruptHost(ctx, host) { + w.a.V(1).M(host).F().Warning("Deferring host reconcile due to shard safety guard (no healthy peer). Host: %s", host.GetName()) + deferredHosts++ + continue + } + + if err := w.reconcileHostWithHook(ctx, host); err != nil { + return err + } + reconciledHosts++ + } + + if deferredHosts > 0 && recoveredHosts == 0 && reconciledHosts == 0 { + w.a.V(1).M(shard).F().Warning("No progress in shard reconcile: %d host(s) deferred by shard safety guard", deferredHosts) + return common.ErrCRUDAbort + } + + return nil +} + +func (w *worker) reconcileShardWithHook(ctx context.Context, shard api.IShard) error { + if w.reconcileShardFn != nil { + return w.reconcileShardFn(ctx, shard) + } + return w.reconcileShard(ctx, shard) +} + +func (w *worker) reconcileHostWithHook(ctx context.Context, host *api.Host) error { + if w.reconcileHostFn != nil { + return w.reconcileHostFn(ctx, host) + } + return w.reconcileHost(ctx, host) } // reconcileShard reconciles specified shard, excluding nested replicas diff --git a/pkg/controller/chi/worker-reconciler-shard_test.go b/pkg/controller/chi/worker-reconciler-shard_test.go new file mode 100644 index 000000000..34eb5699d --- /dev/null +++ b/pkg/controller/chi/worker-reconciler-shard_test.go @@ -0,0 +1,146 @@ +package chi + +import ( + "context" + "reflect" + "testing" + + api "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse.altinity.com/v1" + "github.com/altinity/clickhouse-operator/pkg/apis/common/types" +) + +func makeTestShardFixture(hostNames ...string) (*api.ClickHouseInstallation, *api.ChiShard, []*api.Host) { + shard := &api.ChiShard{ + Name: "shard-0", + Hosts: make([]*api.Host, 0, len(hostNames)), + } + cluster := &api.Cluster{ + Name: "cluster-0", + Layout: &api.ChiClusterLayout{Shards: []*api.ChiShard{shard}}, + } + chi := &api.ClickHouseInstallation{ + Spec: api.ChiSpec{ + Configuration: &api.Configuration{Clusters: []*api.Cluster{cluster}}, + }, + } + cluster.Runtime.CHI = chi + shard.Runtime.CHI = chi + + hosts := make([]*api.Host, 0, len(hostNames)) + for i, hostName := range hostNames { + host := &api.Host{Name: hostName} + host.SetCR(chi) + host.Runtime.Address.ClusterName = cluster.Name + host.Runtime.Address.ShardName = shard.Name + host.Runtime.Address.HostName = hostName + host.Runtime.Address.ReplicaIndex = i + host.GetReconcileAttributes().SetStatus(types.ObjectStatusModified) + shard.Hosts = append(shard.Hosts, host) + hosts = append(hosts, host) + } + + return chi, shard, hosts +} + +func Test_isShardSafeToDisruptHost(t *testing.T) { + _, _, hosts := makeTestShardFixture("host-a", "host-b") + hostA := hosts[0] + hostB := hosts[1] + + health := map[string]bool{ + hostA.GetName(): true, + hostB.GetName(): false, + } + + w := &worker{ + hostHealthyFn: func(ctx context.Context, host *api.Host) bool { + return health[host.GetName()] + }, + } + + if w.isShardSafeToDisruptHost(context.Background(), hostA) { + t.Fatalf("expected shard to be unsafe to disrupt %s when peer is unhealthy", hostA.GetName()) + } + + health[hostB.GetName()] = true + if !w.isShardSafeToDisruptHost(context.Background(), hostA) { + t.Fatalf("expected shard to be safe to disrupt %s after peer recovery", hostA.GetName()) + } +} + +func Test_reconcileShardWithHosts_RecoveryFirstOrdering(t *testing.T) { + _, shard, hosts := makeTestShardFixture("host-a", "host-b") + hostA := hosts[0] + hostB := hosts[1] + + health := map[string]bool{ + hostA.GetName(): true, + hostB.GetName(): false, + } + order := make([]string, 0) + + w := &worker{ + hostHealthyFn: func(ctx context.Context, host *api.Host) bool { + return health[host.GetName()] + }, + reconcileShardFn: func(ctx context.Context, shard api.IShard) error { + return nil + }, + reconcileHostFn: func(ctx context.Context, host *api.Host) error { + order = append(order, host.GetName()) + if host.GetName() == hostB.GetName() { + // Simulate steady-state recovery before rollout continues. + health[hostB.GetName()] = true + } + return nil + }, + } + + if err := w.reconcileShardWithHosts(context.Background(), shard); err != nil { + t.Fatalf("reconcileShardWithHosts() unexpected error: %v", err) + } + + wantOrder := []string{hostB.GetName(), hostA.GetName()} + if !reflect.DeepEqual(order, wantOrder) { + t.Fatalf("unexpected reconcile order, got=%v want=%v", order, wantOrder) + } +} + +func Test_reconcileShardWithHosts_InterruptedRolloutAfterRestart_RecoversMissingReplicaFirst(t *testing.T) { + _, shard, hosts := makeTestShardFixture("chi-foo-1-0-0-0", "chi-foo-1-0-1-0") + host00 := hosts[0] + host01 := hosts[1] + + // Simulate restart in the middle of rollout: host01 is still down, host00 is up. + health := map[string]bool{ + host00.GetName(): true, + host01.GetName(): false, + } + order := make([]string, 0) + + w := &worker{ + hostHealthyFn: func(ctx context.Context, host *api.Host) bool { + return health[host.GetName()] + }, + reconcileShardFn: func(ctx context.Context, shard api.IShard) error { + return nil + }, + reconcileHostFn: func(ctx context.Context, host *api.Host) error { + order = append(order, host.GetName()) + if host.GetName() == host01.GetName() { + health[host01.GetName()] = true + } + return nil + }, + } + + if err := w.reconcileShardWithHosts(context.Background(), shard); err != nil { + t.Fatalf("reconcileShardWithHosts() unexpected error: %v", err) + } + + // Regression assertion: missing replica is recovered before any further disruption on the healthy replica. + wantOrder := []string{host01.GetName(), host00.GetName()} + if !reflect.DeepEqual(order, wantOrder) { + t.Fatalf("unexpected reconcile order after interrupted rollout, got=%v want=%v", order, wantOrder) + } +} diff --git a/pkg/controller/chi/worker-status-helpers.go b/pkg/controller/chi/worker-status-helpers.go index 8da4e493d..60860f714 100644 --- a/pkg/controller/chi/worker-status-helpers.go +++ b/pkg/controller/chi/worker-status-helpers.go @@ -73,6 +73,103 @@ func (w *worker) isPodOK(ctx context.Context, host *api.Host) bool { return false } +// isHostHealthyForReconcile reports whether host is healthy enough to proceed with shard disruption. +// Stopped and troubleshoot hosts are considered intentionally unavailable and do not block rollout safety checks. +func (w *worker) isHostHealthyForReconcile(ctx context.Context, host *api.Host) bool { + if w.hostHealthyFn != nil { + return w.hostHealthyFn(ctx, host) + } + + if host == nil { + return false + } + + if host.IsStopped() || host.IsTroubleshoot() { + return true + } + + if w == nil || w.c == nil || w.c.kube == nil { + return false + } + + if !w.isPodRunning(ctx, host) { + return false + } + if !w.isPodReady(ctx, host) { + return false + } + if w.isPodCrushed(ctx, host) { + return false + } + + return true +} + +func (w *worker) hasUnhealthyHosts(ctx context.Context, cr *api.ClickHouseInstallation) bool { + if cr == nil { + return false + } + + hasUnhealthy := false + cr.WalkHosts(func(host *api.Host) error { + if !w.isHostHealthyForReconcile(ctx, host) { + hasUnhealthy = true + } + return nil + }) + + return hasUnhealthy +} + +// isShardSafeToDisruptHost verifies there is at least one healthy peer in shard before disrupting host. +func (w *worker) isShardSafeToDisruptHost(ctx context.Context, host *api.Host) bool { + if host == nil || host.GetShard() == nil { + return true + } + + shard := host.GetShard() + if shard.HostsCount() <= 1 { + return true + } + + hasHealthyPeer := false + shard.WalkHosts(func(peer *api.Host) error { + if peer == nil { + return nil + } + if peer.GetName() == host.GetName() { + return nil + } + if w.isHostHealthyForReconcile(ctx, peer) { + hasHealthyPeer = true + } + return nil + }) + + return hasHealthyPeer +} + +// hostMayRequireDisruption returns true when host reconcile is expected to restart/exclude/update host. +func (w *worker) hostMayRequireDisruption(ctx context.Context, host *api.Host) bool { + if host == nil { + return false + } + if host.IsStopped() || host.IsTroubleshoot() { + return false + } + + if w.shouldForceRestartHost(ctx, host) { + return true + } + + status := host.GetReconcileAttributes().GetStatus() + if status.Is(types.ObjectStatusRequested) || status.Is(types.ObjectStatusSame) { + return false + } + + return true +} + func (w *worker) isPodRestarted(ctx context.Context, host *api.Host, initialRestartCounters map[string]int) bool { curRestartCounters, _ := w.c.kube.Pod().(interfaces.IKubePodEx).GetRestartCounters(ctx, host) return !util.MapsAreTheSame(initialRestartCounters, curRestartCounters) diff --git a/pkg/controller/chi/worker-wait-exclude-include-restart.go b/pkg/controller/chi/worker-wait-exclude-include-restart.go index 370aa2867..a406bc3ec 100644 --- a/pkg/controller/chi/worker-wait-exclude-include-restart.go +++ b/pkg/controller/chi/worker-wait-exclude-include-restart.go @@ -381,6 +381,22 @@ func (w *worker) shouldExcludeHost(ctx context.Context, host *api.Host) bool { return false case w.shouldForceRestartHost(ctx, host): + if !w.isHostHealthyForReconcile(ctx, host) { + w.a.V(1). + M(host).F(). + Info("Host requires restart but is not healthy now, no need to exclude before recovery. Host/shard/cluster: %d/%d/%s", + host.Runtime.Address.ReplicaIndex, host.Runtime.Address.ShardIndex, host.Runtime.Address.ClusterName) + return false + } + + if !w.isShardSafeToDisruptHost(ctx, host) { + w.a.V(1). + M(host).F(). + Warning("Host restart is deferred due to shard safety guard (no healthy peer). Host/shard/cluster: %d/%d/%s", + host.Runtime.Address.ReplicaIndex, host.Runtime.Address.ShardIndex, host.Runtime.Address.ClusterName) + return false + } + w.a.V(1). M(host).F(). Info("Host should be restarted, need to exclude. Host/shard/cluster: %d/%d/%s", diff --git a/pkg/controller/chi/worker.go b/pkg/controller/chi/worker.go index 25051045a..766a9687b 100644 --- a/pkg/controller/chi/worker.go +++ b/pkg/controller/chi/worker.go @@ -64,6 +64,11 @@ type worker struct { task *common.Task stsReconciler *statefulset.Reconciler + // Optional test hooks. Production code leaves these nil. + hostHealthyFn func(ctx context.Context, host *api.Host) bool + reconcileHostFn func(ctx context.Context, host *api.Host) error + reconcileShardFn func(ctx context.Context, shard api.IShard) error + start time.Time } @@ -342,12 +347,6 @@ func (w *worker) updateCHI(ctx context.Context, old, new *api.ClickHouseInstalla return nil } - if w.isCHIProcessedOnTheSameIP(new) { - // First minute after restart do not reconcile already reconciled generations - w.a.V(1).M(new).F().Info("Will not reconcile known generation after restart. Generation %d", new.Generation) - return nil - } - if util.IsContextDone(ctx) { log.V(1).Info("Reconcile is aborted.") return nil