diff --git a/internal/controller/handler.go b/internal/controller/handler.go index a8247a4ab7..17424f1951 100644 --- a/internal/controller/handler.go +++ b/internal/controller/handler.go @@ -230,6 +230,22 @@ func (h *eventHandlerImpl) sendNginxConfig(ctx context.Context, logger logr.Logg continue } + if gatewayHasPendingWAFBundle(gr, gw) { + // Fail-closed: a WAF bundle for this Gateway has not yet been fetched. + // Withhold the config push rather than serve without WAF protection. + // Enqueue a status update so the pending condition is visible to the operator. + obj := &status.QueueObject{ + UpdateType: status.UpdateAll, + Deployment: status.Deployment{ + NamespacedName: gw.DeploymentName, + GatewayName: gw.Source.GetName(), + }, + Error: errors.New("NGINX configuration update withheld: WAF bundle for Gateway is still pending"), + } + h.cfg.statusQueue.Enqueue(obj) + continue + } + deployment := h.cfg.nginxDeployments.GetOrStore(ctx, gw.DeploymentName, gw.Source.GetName()) if deployment == nil { panic("expected deployment, got nil") @@ -364,6 +380,51 @@ func (h *eventHandlerImpl) reconcileWAFPollers(ctx context.Context, gr *graph.Gr h.cfg.wafPollerManager.StopPollersNotIn(activePolicies) } +// gatewayHasPendingWAFBundle returns true if any WAFGatewayBindingPolicy that targets this Gateway +// (directly or via an attached route) has BundlePending=true. +// When true, the Gateway config push must be withheld to maintain fail-closed posture. +func gatewayHasPendingWAFBundle(gr *graph.Graph, gw *graph.Gateway) bool { + gwNsName := types.NamespacedName{ + Namespace: gw.Source.GetNamespace(), + Name: gw.Source.GetName(), + } + + for key, policy := range gr.NGFPolicies { + if key.GVK.Kind != kinds.WAFGatewayBindingPolicy { + continue + } + if policy.WAFState == nil || !policy.WAFState.BundlePending { + continue + } + if _, invalid := policy.InvalidForGateways[gwNsName]; invalid { + continue + } + for _, ref := range policy.TargetRefs { + switch ref.Kind { + case kinds.Gateway: + if ref.Nsname == gwNsName { + return true + } + case kinds.HTTPRoute, kinds.GRPCRoute: + routeKey := graph.RouteKey{ + NamespacedName: ref.Nsname, + RouteType: routeTypeForKind(ref.Kind), + } + route, exists := gr.Routes[routeKey] + if !exists || !route.Valid { + continue + } + for _, parentRef := range route.ParentRefs { + if parentRef.Gateway != nil && parentRef.Gateway.NamespacedName == gwNsName { + return true + } + } + } + } + } + return false +} + // collectPolicyTargetDeployments returns the unique set of deployment names that a policy targets. // It handles policies targeting Gateways directly, as well as policies targeting HTTPRoutes/GRPCRoutes // (which are attached to Gateways via ParentRefs). @@ -688,6 +749,14 @@ func (h *eventHandlerImpl) parseAndCaptureEvent(ctx context.Context, logger logr } h.cfg.processor.CaptureDeleteChange(e.Type, e.NamespacedName) + case events.WAFBundleReconcileEvent: + logger.V(1).Info("WAF bundle now available, triggering re-reconcile", "policy", e.PolicyNsName) + // Mark the processor dirty so Process() performs a graph rebuild even if this is the + // only event in the batch. Without this, clusterStateChanged=false causes Process() to + // return nil and the pending Gateway is never unblocked. + // We do not call CaptureUpsertChange here because that would overwrite the real policy + // object in cluster state with a metadata-only stub, corrupting the next graph build. + h.cfg.processor.ForceRebuild() default: panic(fmt.Errorf("unknown event type %T", e)) } diff --git a/internal/controller/handler_test.go b/internal/controller/handler_test.go index 23ef24697f..14d45f8baf 100644 --- a/internal/controller/handler_test.go +++ b/internal/controller/handler_test.go @@ -727,6 +727,60 @@ var _ = Describe("eventHandler", func() { Expect(handler.GetLatestConfiguration()).To(BeEmpty()) }) + It("should withhold config push and enqueue status update when WAF bundle is pending", func() { + gwNsName := types.NamespacedName{Namespace: "test", Name: "gateway"} + pendingGraph := &graph.Graph{ + Gateways: map[types.NamespacedName]*graph.Gateway{ + gwNsName: { + Valid: true, + Source: &gatewayv1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: gwNsName.Namespace, + Name: gwNsName.Name, + }, + }, + DeploymentName: types.NamespacedName{Namespace: "test", Name: "gateway-nginx"}, + }, + }, + NGFPolicies: map[graph.PolicyKey]*graph.Policy{ + wafPolicyKey("waf-policy"): { + Source: makeWAFPolicy(false), + WAFState: &graph.PolicyWAFState{ + BundlePending: true, + }, + TargetRefs: []graph.PolicyTargetRef{ + {Kind: kinds.Gateway, Nsname: gwNsName}, + }, + }, + }, + } + + fakeProcessor.ProcessReturns(pendingGraph) + fakeProcessor.GetLatestGraphReturns(pendingGraph) + + e := &events.UpsertEvent{Resource: &gatewayv1.Gateway{}} + handler.HandleEventBatch(context.Background(), logr.Discard(), []any{e}) + + Expect(fakeNginxUpdater.UpdateConfigCallCount()).To(Equal(0)) + // Status update is consumed by waitForStatusUpdates and triggers UpdateGroup. + // Use Eventually because waitForStatusUpdates runs in a separate goroutine. + Eventually(fakeStatusUpdater.UpdateGroupCallCount).Should(BeNumerically(">=", 1)) + }) + + It("should handle WAFBundleReconcileEvent without panicking and mark processor dirty", func() { + e := events.WAFBundleReconcileEvent{ + PolicyNsName: types.NamespacedName{Namespace: "default", Name: "my-waf-policy"}, + } + + handle := func() { + batch := []any{e} + handler.HandleEventBatch(context.Background(), logr.Discard(), batch) + } + + Expect(handle).ShouldNot(Panic()) + Expect(fakeProcessor.ForceRebuildCallCount()).To(Equal(1)) + }) + It("should process events with volume mounts from Deployment", func() { // Create a gateway with EffectiveNginxProxy containing Deployment VolumeMounts gatewayWithVolumeMounts := &graph.Graph{ @@ -1901,6 +1955,157 @@ func TestCollectPolicyTargetDeployments(t *testing.T) { } } +func TestGatewayHasPendingWAFBundle(t *testing.T) { + t.Parallel() + + gwNsName := types.NamespacedName{Namespace: "default", Name: "my-gateway"} + gw := &graph.Gateway{ + Valid: true, + Source: &gatewayv1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: gwNsName.Namespace, + Name: gwNsName.Name, + }, + }, + DeploymentName: types.NamespacedName{Namespace: "default", Name: "nginx-dep"}, + } + + makeWAFPolicy := func( + pending bool, + targetKind gatewayv1.Kind, + targetNsName types.NamespacedName, + invalidForGateways ...types.NamespacedName, + ) *graph.Policy { + var wafState *graph.PolicyWAFState + if pending { + wafState = &graph.PolicyWAFState{BundlePending: true} + } else { + wafState = &graph.PolicyWAFState{BundlePending: false} + } + invalid := make(map[types.NamespacedName]struct{}, len(invalidForGateways)) + for _, ns := range invalidForGateways { + invalid[ns] = struct{}{} + } + return &graph.Policy{ + Source: &ngfAPI.WAFGatewayBindingPolicy{}, + WAFState: wafState, + InvalidForGateways: invalid, + TargetRefs: []graph.PolicyTargetRef{ + {Kind: targetKind, Nsname: targetNsName}, + }, + } + } + + tests := []struct { + policies map[graph.PolicyKey]*graph.Policy + routes map[graph.RouteKey]*graph.L7Route + name string + expPending bool + }{ + { + name: "no policies returns false", + policies: map[graph.PolicyKey]*graph.Policy{}, + expPending: false, + }, + { + name: "pending policy targeting gateway directly returns true", + policies: map[graph.PolicyKey]*graph.Policy{ + wafPolicyKey("waf"): makeWAFPolicy(true, kinds.Gateway, gwNsName), + }, + expPending: true, + }, + { + name: "non-pending policy targeting gateway returns false", + policies: map[graph.PolicyKey]*graph.Policy{ + wafPolicyKey("waf"): makeWAFPolicy(false, kinds.Gateway, gwNsName), + }, + expPending: false, + }, + { + name: "pending policy targeting a different gateway returns false", + policies: map[graph.PolicyKey]*graph.Policy{ + wafPolicyKey("waf"): makeWAFPolicy( + true, kinds.Gateway, + types.NamespacedName{Namespace: "default", Name: "other-gw"}, + ), + }, + expPending: false, + }, + { + name: "pending policy targeting HTTPRoute attached to gateway returns true", + policies: map[graph.PolicyKey]*graph.Policy{ + wafPolicyKey("waf"): makeWAFPolicy( + true, kinds.HTTPRoute, + types.NamespacedName{Namespace: "default", Name: "my-route"}, + ), + }, + routes: map[graph.RouteKey]*graph.L7Route{ + {NamespacedName: types.NamespacedName{Namespace: "default", Name: "my-route"}, RouteType: graph.RouteTypeHTTP}: { + Valid: true, + ParentRefs: []graph.ParentRef{ + {Gateway: &graph.ParentRefGateway{NamespacedName: gwNsName}}, + }, + }, + }, + expPending: true, + }, + { + name: "nil WAFState returns false", + policies: map[graph.PolicyKey]*graph.Policy{ + wafPolicyKey("waf"): { + Source: &ngfAPI.WAFGatewayBindingPolicy{}, + WAFState: nil, + TargetRefs: []graph.PolicyTargetRef{{Kind: kinds.Gateway, Nsname: gwNsName}}, + }, + }, + expPending: false, + }, + { + name: "pending policy with gateway in InvalidForGateways returns false", + policies: map[graph.PolicyKey]*graph.Policy{ + wafPolicyKey("waf"): makeWAFPolicy(true, kinds.Gateway, gwNsName, gwNsName), + }, + expPending: false, + }, + { + name: "pending policy via route with gateway in InvalidForGateways returns false", + policies: map[graph.PolicyKey]*graph.Policy{ + wafPolicyKey("waf"): makeWAFPolicy( + true, kinds.HTTPRoute, + types.NamespacedName{Namespace: "default", Name: "my-route"}, + gwNsName, + ), + }, + routes: map[graph.RouteKey]*graph.L7Route{ + { + NamespacedName: types.NamespacedName{Namespace: "default", Name: "my-route"}, + RouteType: graph.RouteTypeHTTP, + }: { + Valid: true, + ParentRefs: []graph.ParentRef{ + {Gateway: &graph.ParentRefGateway{NamespacedName: gwNsName}}, + }, + }, + }, + expPending: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + gr := &graph.Graph{ + NGFPolicies: tt.policies, + Routes: tt.routes, + } + + g.Expect(gatewayHasPendingWAFBundle(gr, gw)).To(Equal(tt.expPending)) + }) + } +} + func TestFindWAFPolicyKey(t *testing.T) { t.Parallel() diff --git a/internal/controller/manager.go b/internal/controller/manager.go index bfbc6a5c5b..6d8ba18375 100644 --- a/internal/controller/manager.go +++ b/internal/controller/manager.go @@ -195,7 +195,7 @@ func StartManager(cfg config.Config) error { return err } - wafPollerManager = createWAFPollerManager(cfg, wafFetcher, nginxUpdater, statusQueue) + wafPollerManager = createWAFPollerManager(ctx, cfg, wafFetcher, nginxUpdater, statusQueue, eventCh) eventHandler := newEventHandlerImpl(eventHandlerConfig{ ctx: ctx, @@ -361,10 +361,12 @@ func createAndRegisterProvisioner( // createWAFPollerManager creates a WAF polling manager if Plus is enabled. // Returns nil when Plus is not enabled. func createWAFPollerManager( + ctx context.Context, cfg config.Config, wafFetcher fetch.Fetcher, nginxUpdater *agent.NginxUpdaterImpl, statusQueue *status.Queue, + eventCh chan<- any, ) wafpolling.PollerManager { if !cfg.Plus { return nil @@ -374,6 +376,8 @@ func createWAFPollerManager( Logger: cfg.Logger.WithName("wafPollingManager"), Fetcher: wafFetcher, Deployments: nginxUpdater.NginxDeployments, + EventCh: eventCh, + Ctx: ctx, StatusCallback: func(targets []types.NamespacedName) { for _, nsName := range targets { dep := nginxUpdater.NginxDeployments.Get(nsName) diff --git a/internal/controller/state/change_processor.go b/internal/controller/state/change_processor.go index ce9b430ac1..2bafe156c8 100644 --- a/internal/controller/state/change_processor.go +++ b/internal/controller/state/change_processor.go @@ -49,6 +49,10 @@ type ChangeProcessor interface { Process(ctx context.Context) (graphCfg *graph.Graph) // GetLatestGraph returns the latest Graph. GetLatestGraph() *graph.Graph + // ForceRebuild forces the next Process() call to perform a full graph rebuild, + // without modifying the cluster state. Used when an external event (e.g. a WAF bundle + // becoming available) must trigger a rebuild without an accompanying resource change. + ForceRebuild() } // ChangeProcessorConfig holds configuration parameters for ChangeProcessorImpl. @@ -93,6 +97,8 @@ type ChangeProcessorImpl struct { updater Updater // getAndResetClusterStateChanged tells if and how the cluster state has changed. getAndResetClusterStateChanged func() bool + // forceClusterStateRebuild forces the changed flag to true without modifying cluster state. + forceClusterStateRebuild func() cfg ChangeProcessorConfig lock sync.Mutex @@ -283,6 +289,7 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl { ) processor.getAndResetClusterStateChanged = trackingUpdater.getAndResetChangedStatus + processor.forceClusterStateRebuild = trackingUpdater.forceRebuild processor.updater = trackingUpdater return processor @@ -317,6 +324,14 @@ func (c *ChangeProcessorImpl) CaptureDeleteChange(resourceType ngftypes.ObjectTy c.updater.Delete(resourceType, nsname) } +// ForceRebuild forces the next Process() call to rebuild the graph without modifying cluster state. +func (c *ChangeProcessorImpl) ForceRebuild() { + c.lock.Lock() + defer c.lock.Unlock() + + c.forceClusterStateRebuild() +} + func (c *ChangeProcessorImpl) Process(ctx context.Context) *graph.Graph { c.lock.Lock() defer c.lock.Unlock() diff --git a/internal/controller/state/conditions/conditions.go b/internal/controller/state/conditions/conditions.go index 8ae9a5885b..c547077207 100644 --- a/internal/controller/state/conditions/conditions.go +++ b/internal/controller/state/conditions/conditions.go @@ -1501,3 +1501,15 @@ func NewPolicyProgrammedStaleBundleWarning(errMsg string) Condition { Message: fmt.Sprintf("Bundle fetch failed; using previously fetched bundle: %s", errMsg), } } + +// NewPolicyNotProgrammedBundlePending returns a Condition that indicates the WAF bundle has not +// yet been successfully fetched. The Gateway config push is withheld until the bundle is available, +// maintaining a fail-closed posture. +func NewPolicyNotProgrammedBundlePending(errMsg string) Condition { + return Condition{ + Type: string(WAFProgrammedConditionType), + Status: metav1.ConditionFalse, + Reason: string(PolicyReasonPending), + Message: fmt.Sprintf("Waiting for WAF bundle; last fetch error: %s", errMsg), + } +} diff --git a/internal/controller/state/graph/policies.go b/internal/controller/state/graph/policies.go index 9370752f86..c502b7f2ae 100644 --- a/internal/controller/state/graph/policies.go +++ b/internal/controller/state/graph/policies.go @@ -59,6 +59,10 @@ type PolicyWAFState struct { ResolvedAuth *fetch.BundleAuth // ResolvedTLSCA contains the resolved TLS CA certificate data for WAF bundle fetching. ResolvedTLSCA []byte + // BundlePending is true when the policy's bundle has never been successfully fetched + // (cold-miss on startup or after all retries are exhausted with no previous bundle). + // The Gateway config push is withheld until this is resolved to maintain fail-closed posture. + BundlePending bool } // PolicyAncestor represents an ancestor of a Policy. @@ -1015,9 +1019,9 @@ func fetchPolicyBundle( policy.WAFState.Bundles[bundleKey] = prev return } - cond := conditions.NewPolicyNotProgrammedBundleFetchError(err.Error()) + cond := conditions.NewPolicyNotProgrammedBundlePending(err.Error()) policy.Conditions = append(policy.Conditions, cond) - policy.Valid = false + policy.WAFState.BundlePending = true return } @@ -1093,9 +1097,9 @@ func fetchSecurityLogBundles( policy.WAFState.Bundles[bundleKey] = prev continue } - cond := conditions.NewPolicyNotProgrammedBundleFetchError(err.Error()) + cond := conditions.NewPolicyNotProgrammedBundlePending(err.Error()) policy.Conditions = append(policy.Conditions, cond) - policy.Valid = false + policy.WAFState.BundlePending = true continue } diff --git a/internal/controller/state/graph/policies_test.go b/internal/controller/state/graph/policies_test.go index 414bda8194..e2c4a2f21f 100644 --- a/internal/controller/state/graph/policies_test.go +++ b/internal/controller/state/graph/policies_test.go @@ -2913,6 +2913,7 @@ func TestProcessWAFGatewayBindingPolicies(t *testing.T) { expConditions func(pol *Policy) []conditions.Condition name string expValid bool + expBundlePending bool }{ { name: "nil wafInput returns nil", @@ -2987,7 +2988,7 @@ func TestProcessWAFGatewayBindingPolicies(t *testing.T) { expValid: true, }, { - name: "fetch error with no previous bundle marks policy invalid", + name: "fetch error with no previous bundle sets policy pending (fail-closed)", processedPolicies: func() map[PolicyKey]*Policy { wafPolicy := makeWAFPolicy(policyName, false, false, false) key, pol := makePolicyEntry(wafPolicy, true) @@ -3006,9 +3007,10 @@ func TestProcessWAFGatewayBindingPolicies(t *testing.T) { expBundles: map[WAFBundleKey]*WAFBundleData{}, expSecrets: map[types.NamespacedName]*corev1.Secret{}, expConditions: func(_ *Policy) []conditions.Condition { - return []conditions.Condition{conditions.NewPolicyNotProgrammedBundleFetchError("fetch failed")} + return []conditions.Condition{conditions.NewPolicyNotProgrammedBundlePending("fetch failed")} }, - expValid: false, + expValid: true, + expBundlePending: true, }, { name: "fetch error with previous bundle uses stale bundle and adds warning condition", @@ -3220,7 +3222,7 @@ func TestProcessWAFGatewayBindingPolicies(t *testing.T) { expValid: true, }, { - name: "security log fetch error with no previous bundle marks policy invalid", + name: "security log fetch error with no previous bundle sets policy pending (fail-closed)", processedPolicies: func() map[PolicyKey]*Policy { wafPolicy := makeWAFPolicy(policyName, false, false, true) key, pol := makePolicyEntry(wafPolicy, true) @@ -3242,9 +3244,10 @@ func TestProcessWAFGatewayBindingPolicies(t *testing.T) { }, expSecrets: map[types.NamespacedName]*corev1.Secret{}, expConditions: func(_ *Policy) []conditions.Condition { - return []conditions.Condition{conditions.NewPolicyNotProgrammedBundleFetchError("log fetch failed")} + return []conditions.Condition{conditions.NewPolicyNotProgrammedBundlePending("log fetch failed")} }, - expValid: false, + expValid: true, + expBundlePending: true, }, { name: "NIM managed source sets PolicyName on fetch request", @@ -3607,6 +3610,9 @@ func TestProcessWAFGatewayBindingPolicies(t *testing.T) { } else if pol.Conditions != nil { g.Expect(pol.Valid).To(BeFalse()) } + if pol.WAFState != nil { + g.Expect(pol.WAFState.BundlePending).To(Equal(tc.expBundlePending)) + } } }) } diff --git a/internal/controller/state/statefakes/fake_change_processor.go b/internal/controller/state/statefakes/fake_change_processor.go index d2bfc9086e..a71bc6217d 100644 --- a/internal/controller/state/statefakes/fake_change_processor.go +++ b/internal/controller/state/statefakes/fake_change_processor.go @@ -24,6 +24,10 @@ type FakeChangeProcessor struct { captureUpsertChangeArgsForCall []struct { arg1 client.Object } + ForceRebuildStub func() + forceRebuildMutex sync.RWMutex + forceRebuildArgsForCall []struct { + } GetLatestGraphStub func() *graph.Graph getLatestGraphMutex sync.RWMutex getLatestGraphArgsForCall []struct { @@ -114,6 +118,30 @@ func (fake *FakeChangeProcessor) CaptureUpsertChangeArgsForCall(i int) client.Ob return argsForCall.arg1 } +func (fake *FakeChangeProcessor) ForceRebuild() { + fake.forceRebuildMutex.Lock() + fake.forceRebuildArgsForCall = append(fake.forceRebuildArgsForCall, struct { + }{}) + stub := fake.ForceRebuildStub + fake.recordInvocation("ForceRebuild", []interface{}{}) + fake.forceRebuildMutex.Unlock() + if stub != nil { + fake.ForceRebuildStub() + } +} + +func (fake *FakeChangeProcessor) ForceRebuildCallCount() int { + fake.forceRebuildMutex.RLock() + defer fake.forceRebuildMutex.RUnlock() + return len(fake.forceRebuildArgsForCall) +} + +func (fake *FakeChangeProcessor) ForceRebuildCalls(stub func()) { + fake.forceRebuildMutex.Lock() + defer fake.forceRebuildMutex.Unlock() + fake.ForceRebuildStub = stub +} + func (fake *FakeChangeProcessor) GetLatestGraph() *graph.Graph { fake.getLatestGraphMutex.Lock() ret, specificReturn := fake.getLatestGraphReturnsOnCall[len(fake.getLatestGraphArgsForCall)] diff --git a/internal/controller/state/store.go b/internal/controller/state/store.go index dab3f4244d..14e1bde582 100644 --- a/internal/controller/state/store.go +++ b/internal/controller/state/store.go @@ -342,3 +342,10 @@ func (s *changeTrackingUpdater) getAndResetChangedStatus() bool { s.changed = false return changed } + +// forceRebuild forces the changed status to true without modifying any cluster state. +// This is used when an external event (e.g., a WAF bundle becoming available) must trigger +// a graph rebuild without touching the object stores. +func (s *changeTrackingUpdater) forceRebuild() { + s.changed = true +} diff --git a/internal/framework/events/event.go b/internal/framework/events/event.go index f203ca1ec0..1efc20f43f 100644 --- a/internal/framework/events/event.go +++ b/internal/framework/events/event.go @@ -23,3 +23,11 @@ type DeleteEvent struct { // NamespacedName is the namespace & name of the deleted resource. NamespacedName types.NamespacedName } + +// WAFBundleReconcileEvent is injected by the WAF poller manager when a bundle that was previously +// unavailable has been successfully fetched for the first time. +// It signals the event handler to re-reconcile the affected policy so the Gateway config push proceeds. +type WAFBundleReconcileEvent struct { + // PolicyNsName is the namespace/name of the WAFGatewayBindingPolicy whose bundle is now available. + PolicyNsName types.NamespacedName +} diff --git a/internal/framework/waf/manager.go b/internal/framework/waf/manager.go index 7f4ddc332b..83d7bcbb13 100644 --- a/internal/framework/waf/manager.go +++ b/internal/framework/waf/manager.go @@ -10,6 +10,7 @@ import ( "github.com/nginx/nginx-gateway-fabric/v2/internal/controller/nginx/agent" "github.com/nginx/nginx-gateway-fabric/v2/internal/controller/state/graph" + "github.com/nginx/nginx-gateway-fabric/v2/internal/framework/events" "github.com/nginx/nginx-gateway-fabric/v2/internal/framework/fetch" ) @@ -42,14 +43,23 @@ type PollerManager interface { // Manager manages the lifecycle of all WAF bundle pollers. // It creates, tracks, and stops pollers as WAFGatewayBindingPolicies are created, updated, or deleted. type Manager struct { - fetcher fetch.Fetcher - deployments agent.DeploymentStorer - pollers map[types.NamespacedName]*pollerEntry - pollErrors map[types.NamespacedName]*PollError - bundleCache map[graph.WAFBundleKey]*graph.WAFBundleData - statusCallback func(targets []types.NamespacedName) - logger logr.Logger - mu sync.RWMutex + fetcher fetch.Fetcher + deployments agent.DeploymentStorer + pollers map[types.NamespacedName]*pollerEntry + pollErrors map[types.NamespacedName]*PollError + bundleCache map[graph.WAFBundleKey]*graph.WAFBundleData + // bundleKeyToPolicy maps each bundle key to the policy that owns it. + // Used to look up the policy namespace/name when injecting a WAFBundleReconcileEvent. + bundleKeyToPolicy map[graph.WAFBundleKey]types.NamespacedName + statusCallback func(targets []types.NamespacedName) + // eventCh is the send side of the main event loop channel. + // A WAFBundleReconcileEvent is sent when a previously-pending bundle is first fetched successfully, + // triggering an immediate re-reconcile so the Gateway config push can proceed. + eventCh chan<- any + // ctx is the root context for the manager, used to cancel goroutines on shutdown. + ctx context.Context + logger logr.Logger + mu sync.RWMutex } // pollerEntry holds a poller and its cancellation function. @@ -63,19 +73,31 @@ type ManagerConfig struct { Fetcher fetch.Fetcher Deployments agent.DeploymentStorer StatusCallback func(targets []types.NamespacedName) - Logger logr.Logger + EventCh chan<- any + // Ctx is the root context for the manager lifetime. + // It is used to cancel goroutines that inject events into the event loop on shutdown. + Ctx context.Context + Logger logr.Logger } // NewManager creates a new Manager. +// It panics if EventCh is set without Ctx, as the event-injection goroutine requires +// a context to avoid leaking on shutdown. func NewManager(cfg ManagerConfig) *Manager { + if cfg.EventCh != nil && cfg.Ctx == nil { + panic("waf.ManagerConfig: Ctx must be set when EventCh is set") + } return &Manager{ - logger: cfg.Logger, - fetcher: cfg.Fetcher, - deployments: cfg.Deployments, - pollers: make(map[types.NamespacedName]*pollerEntry), - pollErrors: make(map[types.NamespacedName]*PollError), - bundleCache: make(map[graph.WAFBundleKey]*graph.WAFBundleData), - statusCallback: cfg.StatusCallback, + logger: cfg.Logger, + fetcher: cfg.Fetcher, + deployments: cfg.Deployments, + pollers: make(map[types.NamespacedName]*pollerEntry), + pollErrors: make(map[types.NamespacedName]*PollError), + bundleCache: make(map[graph.WAFBundleKey]*graph.WAFBundleData), + bundleKeyToPolicy: make(map[graph.WAFBundleKey]types.NamespacedName), + statusCallback: cfg.StatusCallback, + eventCh: cfg.EventCh, + ctx: cfg.Ctx, } } @@ -124,6 +146,7 @@ func (m *Manager) startPoller(ctx context.Context, cfg PollerConfig) { m.logger.V(1).Info("Stopping existing poller before starting new one", "policy", cfg.PolicyNsName) entry.cancel() delete(m.pollErrors, cfg.PolicyNsName) + m.clearBundleCacheLocked(entry.poller) } pollerCtx, cancel := context.WithCancel(ctx) //nolint:gosec // Cancel is handled externally to this function @@ -139,6 +162,11 @@ func (m *Manager) startPoller(ctx context.Context, cfg PollerConfig) { } } + // Record which policy owns each bundle key so cacheBundleUpdate can inject the correct event. + for _, src := range cfg.Sources { + m.bundleKeyToPolicy[src.BundleKey] = cfg.PolicyNsName + } + poller = newPoller(pollerConfig{ logger: m.logger, policyNsName: cfg.PolicyNsName, @@ -193,14 +221,43 @@ func (m *Manager) recordPollResult(policyNsName types.NamespacedName, bundleKey // cacheBundleUpdate stores the latest successfully polled bundle data in the manager's cache. // This is called by pollers when they detect a changed bundle, ensuring the freshest data // is available for graph rebuild stale-bundle fallback. +// On the first time a bundle key appears in this manager's cache, a WAFBundleReconcileEvent +// is injected into the event loop to trigger an immediate graph rebuild. +// Note: this fires on any first-cache event, including after a poller restart that cleared the +// cache — not only when the policy was previously in BundlePending state. A spurious reconcile +// event in that case is harmless: it triggers an unnecessary graph rebuild but causes no +// incorrect behavior. func (m *Manager) cacheBundleUpdate(bundleKey graph.WAFBundleKey, data []byte, checksum string) { m.mu.Lock() - defer m.mu.Unlock() + + _, alreadyCached := m.bundleCache[bundleKey] m.bundleCache[bundleKey] = &graph.WAFBundleData{ Data: data, Checksum: checksum, } + + // Capture event details while holding the lock, then release before sending. + var event *events.WAFBundleReconcileEvent + if !alreadyCached { + if policyNsName, ok := m.bundleKeyToPolicy[bundleKey]; ok && m.eventCh != nil { + event = &events.WAFBundleReconcileEvent{PolicyNsName: policyNsName} + } + } + + m.mu.Unlock() + + // Send the reconcile event after releasing the lock so other manager operations are not + // blocked on the mutex while waiting for the event loop. The manager's root context is + // used as a cancellation escape hatch: on shutdown, the event loop exits before the + // manager's context is canceled, so without this the poller goroutine could block + // indefinitely trying to send to an already-drained channel. + if event != nil { + select { + case m.eventCh <- *event: + case <-m.ctx.Done(): + } + } } // GetAllPollErrors returns a deep copy of all current poll errors. @@ -262,6 +319,7 @@ func (m *Manager) stopAll() { m.pollers = make(map[types.NamespacedName]*pollerEntry) m.pollErrors = make(map[types.NamespacedName]*PollError) m.bundleCache = make(map[graph.WAFBundleKey]*graph.WAFBundleData) + m.bundleKeyToPolicy = make(map[graph.WAFBundleKey]types.NamespacedName) m.mu.Unlock() for _, entry := range entries { @@ -271,11 +329,12 @@ func (m *Manager) stopAll() { m.logger.Info("Stopped all WAF pollers", "count", len(entries)) } -// clearBundleCacheLocked removes cached bundle data for all bundle keys owned by the given poller. -// Must be called while m.mu is held. +// clearBundleCacheLocked removes cached bundle data and policy mappings for all bundle keys +// owned by the given poller. Must be called while m.mu is held. func (m *Manager) clearBundleCacheLocked(p *poller) { for _, src := range p.getSources() { delete(m.bundleCache, src.BundleKey) + delete(m.bundleKeyToPolicy, src.BundleKey) } } diff --git a/internal/framework/waf/manager_test.go b/internal/framework/waf/manager_test.go index 044980cfae..3be3b38c25 100644 --- a/internal/framework/waf/manager_test.go +++ b/internal/framework/waf/manager_test.go @@ -12,6 +12,7 @@ import ( "github.com/nginx/nginx-gateway-fabric/v2/internal/controller/nginx/agent/agentfakes" "github.com/nginx/nginx-gateway-fabric/v2/internal/controller/state/graph" + "github.com/nginx/nginx-gateway-fabric/v2/internal/framework/events" "github.com/nginx/nginx-gateway-fabric/v2/internal/framework/fetch" "github.com/nginx/nginx-gateway-fabric/v2/internal/framework/fetch/fetchfakes" ) @@ -673,6 +674,103 @@ func TestManager_GetLatestBundles(t *testing.T) { }) } +func TestManager_cacheBundleUpdateInjectsReconcileEvent(t *testing.T) { + t.Parallel() + + policyNsName := types.NamespacedName{Namespace: "default", Name: "my-policy"} + bundleKey := graph.WAFBundleKey("default_my-policy") + + t.Run("injects WAFBundleReconcileEvent on first fetch for known bundle key", func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + eventCh := make(chan any, 1) + mgr := NewManager(ManagerConfig{ + Logger: logr.Discard(), + Fetcher: &fetchfakes.FakeFetcher{}, + Deployments: &agentfakes.FakeDeploymentStorer{}, + EventCh: eventCh, + Ctx: context.Background(), + }) + + mgr.bundleKeyToPolicy[bundleKey] = policyNsName + + mgr.cacheBundleUpdate(bundleKey, []byte("data"), "checksum") + + g.Expect(eventCh).To(Receive(Equal(events.WAFBundleReconcileEvent{PolicyNsName: policyNsName}))) + }) + + t.Run("does not inject event on subsequent fetches for the same key", func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + eventCh := make(chan any, 2) + mgr := NewManager(ManagerConfig{ + Logger: logr.Discard(), + Fetcher: &fetchfakes.FakeFetcher{}, + Deployments: &agentfakes.FakeDeploymentStorer{}, + EventCh: eventCh, + Ctx: context.Background(), + }) + + mgr.bundleKeyToPolicy[bundleKey] = policyNsName + + mgr.cacheBundleUpdate(bundleKey, []byte("data-v1"), "checksum-v1") + // Consume the first event, then verify no second event is sent. + g.Expect(eventCh).To(Receive()) + mgr.cacheBundleUpdate(bundleKey, []byte("data-v2"), "checksum-v2") + g.Consistently(eventCh).ShouldNot(Receive()) + }) + + t.Run("does not inject event when eventCh is nil", func(t *testing.T) { + t.Parallel() + + mgr := NewManager(ManagerConfig{ + Logger: logr.Discard(), + Fetcher: &fetchfakes.FakeFetcher{}, + Deployments: &agentfakes.FakeDeploymentStorer{}, + }) + + mgr.bundleKeyToPolicy[bundleKey] = policyNsName + + // Must not panic. + mgr.cacheBundleUpdate(bundleKey, []byte("data"), "checksum") + }) + + t.Run("panics when EventCh is set without Ctx", func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + g.Expect(func() { + NewManager(ManagerConfig{ + Logger: logr.Discard(), + Fetcher: &fetchfakes.FakeFetcher{}, + Deployments: &agentfakes.FakeDeploymentStorer{}, + EventCh: make(chan any, 1), + }) + }).To(Panic()) + }) + + t.Run("does not inject event when bundle key has no policy mapping", func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + eventCh := make(chan any, 1) + mgr := NewManager(ManagerConfig{ + Logger: logr.Discard(), + Fetcher: &fetchfakes.FakeFetcher{}, + Deployments: &agentfakes.FakeDeploymentStorer{}, + EventCh: eventCh, + Ctx: context.Background(), + }) + + // No entry in bundleKeyToPolicy. + mgr.cacheBundleUpdate(bundleKey, []byte("data"), "checksum") + + g.Expect(eventCh).To(BeEmpty()) + }) +} + func TestManager_StatusCallbackViaConfig(t *testing.T) { t.Parallel() g := NewWithT(t)