Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions internal/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,21 @@ func (h *eventHandlerImpl) sendNginxConfig(ctx context.Context, logger logr.Logg
continue
}

if gatewayHasPendingWAFBundle(gr, gw) {
// Fail-closed: a WAF bundle for this Gateway has not yet been fetched.
// Withhold the config push rather than serve without WAF protection.
// Enqueue a status update so the pending condition is visible to the operator.
obj := &status.QueueObject{
UpdateType: status.UpdateAll,
Deployment: status.Deployment{
NamespacedName: gw.DeploymentName,
GatewayName: gw.Source.GetName(),
},
Comment thread
ciarams87 marked this conversation as resolved.
}
h.cfg.statusQueue.Enqueue(obj)
continue
}
Comment thread
ciarams87 marked this conversation as resolved.

deployment := h.cfg.nginxDeployments.GetOrStore(ctx, gw.DeploymentName, gw.Source.GetName())
if deployment == nil {
panic("expected deployment, got nil")
Expand Down Expand Up @@ -364,6 +379,51 @@ func (h *eventHandlerImpl) reconcileWAFPollers(ctx context.Context, gr *graph.Gr
h.cfg.wafPollerManager.StopPollersNotIn(activePolicies)
}

// gatewayHasPendingWAFBundle returns true if any WAFGatewayBindingPolicy that targets this Gateway
// (directly or via an attached route) has BundlePending=true.
// When true, the Gateway config push must be withheld to maintain fail-closed posture.
func gatewayHasPendingWAFBundle(gr *graph.Graph, gw *graph.Gateway) bool {
gwNsName := types.NamespacedName{
Namespace: gw.Source.GetNamespace(),
Name: gw.Source.GetName(),
}

for key, policy := range gr.NGFPolicies {
if key.GVK.Kind != kinds.WAFGatewayBindingPolicy {
continue
}
if policy.WAFState == nil || !policy.WAFState.BundlePending {
continue
}
if _, invalid := policy.InvalidForGateways[gwNsName]; invalid {
continue
}
for _, ref := range policy.TargetRefs {
switch ref.Kind {
case kinds.Gateway:
if ref.Nsname == gwNsName {
return true
}
case kinds.HTTPRoute, kinds.GRPCRoute:
routeKey := graph.RouteKey{
NamespacedName: ref.Nsname,
RouteType: routeTypeForKind(ref.Kind),
}
route, exists := gr.Routes[routeKey]
if !exists || !route.Valid {
continue
}
for _, parentRef := range route.ParentRefs {
if parentRef.Gateway != nil && parentRef.Gateway.NamespacedName == gwNsName {
Comment thread
ciarams87 marked this conversation as resolved.
return true
}
}
}
}
}
Comment thread
ciarams87 marked this conversation as resolved.
return false
}

// collectPolicyTargetDeployments returns the unique set of deployment names that a policy targets.
// It handles policies targeting Gateways directly, as well as policies targeting HTTPRoutes/GRPCRoutes
// (which are attached to Gateways via ParentRefs).
Expand Down Expand Up @@ -688,6 +748,14 @@ func (h *eventHandlerImpl) parseAndCaptureEvent(ctx context.Context, logger logr
}

h.cfg.processor.CaptureDeleteChange(e.Type, e.NamespacedName)
case events.WAFBundleReconcileEvent:
logger.V(1).Info("WAF bundle now available, triggering re-reconcile", "policy", e.PolicyNsName)
// Mark the processor dirty so Process() performs a graph rebuild even if this is the
// only event in the batch. Without this, clusterStateChanged=false causes Process() to
// return nil and the pending Gateway is never unblocked.
// We do not call CaptureUpsertChange here because that would overwrite the real policy
// object in cluster state with a metadata-only stub, corrupting the next graph build.
h.cfg.processor.ForceRebuild()
default:
panic(fmt.Errorf("unknown event type %T", e))
}
Expand Down
205 changes: 205 additions & 0 deletions internal/controller/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,60 @@ var _ = Describe("eventHandler", func() {
Expect(handler.GetLatestConfiguration()).To(BeEmpty())
})

It("should withhold config push and enqueue status update when WAF bundle is pending", func() {
gwNsName := types.NamespacedName{Namespace: "test", Name: "gateway"}
pendingGraph := &graph.Graph{
Gateways: map[types.NamespacedName]*graph.Gateway{
gwNsName: {
Valid: true,
Source: &gatewayv1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Namespace: gwNsName.Namespace,
Name: gwNsName.Name,
},
},
DeploymentName: types.NamespacedName{Namespace: "test", Name: "gateway-nginx"},
},
},
NGFPolicies: map[graph.PolicyKey]*graph.Policy{
wafPolicyKey("waf-policy"): {
Source: makeWAFPolicy(false),
WAFState: &graph.PolicyWAFState{
BundlePending: true,
},
TargetRefs: []graph.PolicyTargetRef{
{Kind: kinds.Gateway, Nsname: gwNsName},
},
},
},
}

fakeProcessor.ProcessReturns(pendingGraph)
fakeProcessor.GetLatestGraphReturns(pendingGraph)

e := &events.UpsertEvent{Resource: &gatewayv1.Gateway{}}
handler.HandleEventBatch(context.Background(), logr.Discard(), []any{e})

Expect(fakeNginxUpdater.UpdateConfigCallCount()).To(Equal(0))
// Status update is consumed by waitForStatusUpdates and triggers UpdateGroup.
// Use Eventually because waitForStatusUpdates runs in a separate goroutine.
Eventually(fakeStatusUpdater.UpdateGroupCallCount).Should(BeNumerically(">=", 1))
})

It("should handle WAFBundleReconcileEvent without panicking and mark processor dirty", func() {
e := events.WAFBundleReconcileEvent{
PolicyNsName: types.NamespacedName{Namespace: "default", Name: "my-waf-policy"},
}

handle := func() {
batch := []any{e}
handler.HandleEventBatch(context.Background(), logr.Discard(), batch)
}

Expect(handle).ShouldNot(Panic())
Expect(fakeProcessor.ForceRebuildCallCount()).To(Equal(1))
})

It("should process events with volume mounts from Deployment", func() {
// Create a gateway with EffectiveNginxProxy containing Deployment VolumeMounts
gatewayWithVolumeMounts := &graph.Graph{
Expand Down Expand Up @@ -1901,6 +1955,157 @@ func TestCollectPolicyTargetDeployments(t *testing.T) {
}
}

func TestGatewayHasPendingWAFBundle(t *testing.T) {
t.Parallel()

gwNsName := types.NamespacedName{Namespace: "default", Name: "my-gateway"}
gw := &graph.Gateway{
Valid: true,
Source: &gatewayv1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Namespace: gwNsName.Namespace,
Name: gwNsName.Name,
},
},
DeploymentName: types.NamespacedName{Namespace: "default", Name: "nginx-dep"},
}

makeWAFPolicy := func(
pending bool,
targetKind gatewayv1.Kind,
targetNsName types.NamespacedName,
invalidForGateways ...types.NamespacedName,
) *graph.Policy {
var wafState *graph.PolicyWAFState
if pending {
wafState = &graph.PolicyWAFState{BundlePending: true}
} else {
wafState = &graph.PolicyWAFState{BundlePending: false}
}
invalid := make(map[types.NamespacedName]struct{}, len(invalidForGateways))
for _, ns := range invalidForGateways {
invalid[ns] = struct{}{}
}
return &graph.Policy{
Source: &ngfAPI.WAFGatewayBindingPolicy{},
WAFState: wafState,
InvalidForGateways: invalid,
TargetRefs: []graph.PolicyTargetRef{
{Kind: targetKind, Nsname: targetNsName},
},
}
}

tests := []struct {
policies map[graph.PolicyKey]*graph.Policy
routes map[graph.RouteKey]*graph.L7Route
name string
expPending bool
}{
{
name: "no policies returns false",
policies: map[graph.PolicyKey]*graph.Policy{},
expPending: false,
},
{
name: "pending policy targeting gateway directly returns true",
policies: map[graph.PolicyKey]*graph.Policy{
wafPolicyKey("waf"): makeWAFPolicy(true, kinds.Gateway, gwNsName),
},
expPending: true,
},
{
name: "non-pending policy targeting gateway returns false",
policies: map[graph.PolicyKey]*graph.Policy{
wafPolicyKey("waf"): makeWAFPolicy(false, kinds.Gateway, gwNsName),
},
expPending: false,
},
{
name: "pending policy targeting a different gateway returns false",
policies: map[graph.PolicyKey]*graph.Policy{
wafPolicyKey("waf"): makeWAFPolicy(
true, kinds.Gateway,
types.NamespacedName{Namespace: "default", Name: "other-gw"},
),
},
expPending: false,
},
{
name: "pending policy targeting HTTPRoute attached to gateway returns true",
policies: map[graph.PolicyKey]*graph.Policy{
wafPolicyKey("waf"): makeWAFPolicy(
true, kinds.HTTPRoute,
types.NamespacedName{Namespace: "default", Name: "my-route"},
),
},
routes: map[graph.RouteKey]*graph.L7Route{
{NamespacedName: types.NamespacedName{Namespace: "default", Name: "my-route"}, RouteType: graph.RouteTypeHTTP}: {
Valid: true,
ParentRefs: []graph.ParentRef{
{Gateway: &graph.ParentRefGateway{NamespacedName: gwNsName}},
},
},
},
expPending: true,
},
{
name: "nil WAFState returns false",
policies: map[graph.PolicyKey]*graph.Policy{
wafPolicyKey("waf"): {
Source: &ngfAPI.WAFGatewayBindingPolicy{},
WAFState: nil,
TargetRefs: []graph.PolicyTargetRef{{Kind: kinds.Gateway, Nsname: gwNsName}},
},
},
expPending: false,
},
{
name: "pending policy with gateway in InvalidForGateways returns false",
policies: map[graph.PolicyKey]*graph.Policy{
wafPolicyKey("waf"): makeWAFPolicy(true, kinds.Gateway, gwNsName, gwNsName),
},
expPending: false,
},
{
name: "pending policy via route with gateway in InvalidForGateways returns false",
policies: map[graph.PolicyKey]*graph.Policy{
wafPolicyKey("waf"): makeWAFPolicy(
true, kinds.HTTPRoute,
types.NamespacedName{Namespace: "default", Name: "my-route"},
gwNsName,
),
},
routes: map[graph.RouteKey]*graph.L7Route{
{
NamespacedName: types.NamespacedName{Namespace: "default", Name: "my-route"},
RouteType: graph.RouteTypeHTTP,
}: {
Valid: true,
ParentRefs: []graph.ParentRef{
{Gateway: &graph.ParentRefGateway{NamespacedName: gwNsName}},
},
},
},
expPending: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
g := NewWithT(t)

gr := &graph.Graph{
NGFPolicies: tt.policies,
Routes: tt.routes,
}

g.Expect(gatewayHasPendingWAFBundle(gr, gw)).To(Equal(tt.expPending))
})
}
}

func TestFindWAFPolicyKey(t *testing.T) {
t.Parallel()

Expand Down
6 changes: 5 additions & 1 deletion internal/controller/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func StartManager(cfg config.Config) error {
return err
}

wafPollerManager = createWAFPollerManager(cfg, wafFetcher, nginxUpdater, statusQueue)
wafPollerManager = createWAFPollerManager(ctx, cfg, wafFetcher, nginxUpdater, statusQueue, eventCh)

eventHandler := newEventHandlerImpl(eventHandlerConfig{
ctx: ctx,
Expand Down Expand Up @@ -361,10 +361,12 @@ func createAndRegisterProvisioner(
// createWAFPollerManager creates a WAF polling manager if Plus is enabled.
// Returns nil when Plus is not enabled.
func createWAFPollerManager(
ctx context.Context,
cfg config.Config,
wafFetcher fetch.Fetcher,
nginxUpdater *agent.NginxUpdaterImpl,
statusQueue *status.Queue,
eventCh chan<- any,
) wafpolling.PollerManager {
if !cfg.Plus {
return nil
Expand All @@ -374,6 +376,8 @@ func createWAFPollerManager(
Logger: cfg.Logger.WithName("wafPollingManager"),
Fetcher: wafFetcher,
Deployments: nginxUpdater.NginxDeployments,
EventCh: eventCh,
Ctx: ctx,
StatusCallback: func(targets []types.NamespacedName) {
for _, nsName := range targets {
dep := nginxUpdater.NginxDeployments.Get(nsName)
Expand Down
15 changes: 15 additions & 0 deletions internal/controller/state/change_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type ChangeProcessor interface {
Process(ctx context.Context) (graphCfg *graph.Graph)
// GetLatestGraph returns the latest Graph.
GetLatestGraph() *graph.Graph
// ForceRebuild forces the next Process() call to perform a full graph rebuild,
// without modifying the cluster state. Used when an external event (e.g. a WAF bundle
// becoming available) must trigger a rebuild without an accompanying resource change.
ForceRebuild()
}

// ChangeProcessorConfig holds configuration parameters for ChangeProcessorImpl.
Expand Down Expand Up @@ -93,6 +97,8 @@ type ChangeProcessorImpl struct {
updater Updater
// getAndResetClusterStateChanged tells if and how the cluster state has changed.
getAndResetClusterStateChanged func() bool
// forceClusterStateRebuild forces the changed flag to true without modifying cluster state.
forceClusterStateRebuild func()

cfg ChangeProcessorConfig
lock sync.Mutex
Expand Down Expand Up @@ -283,6 +289,7 @@ func NewChangeProcessorImpl(cfg ChangeProcessorConfig) *ChangeProcessorImpl {
)

processor.getAndResetClusterStateChanged = trackingUpdater.getAndResetChangedStatus
processor.forceClusterStateRebuild = trackingUpdater.forceRebuild
processor.updater = trackingUpdater

return processor
Expand Down Expand Up @@ -317,6 +324,14 @@ func (c *ChangeProcessorImpl) CaptureDeleteChange(resourceType ngftypes.ObjectTy
c.updater.Delete(resourceType, nsname)
}

// ForceRebuild forces the next Process() call to rebuild the graph without modifying cluster state.
func (c *ChangeProcessorImpl) ForceRebuild() {
c.lock.Lock()
defer c.lock.Unlock()

c.forceClusterStateRebuild()
}

func (c *ChangeProcessorImpl) Process(ctx context.Context) *graph.Graph {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down
12 changes: 12 additions & 0 deletions internal/controller/state/conditions/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1501,3 +1501,15 @@ func NewPolicyProgrammedStaleBundleWarning(errMsg string) Condition {
Message: fmt.Sprintf("Bundle fetch failed; using previously fetched bundle: %s", errMsg),
}
}

// NewPolicyNotProgrammedBundlePending returns a Condition that indicates the WAF bundle has not
// yet been successfully fetched. The Gateway config push is withheld until the bundle is available,
// maintaining a fail-closed posture.
func NewPolicyNotProgrammedBundlePending(errMsg string) Condition {
return Condition{
Type: string(WAFProgrammedConditionType),
Status: metav1.ConditionFalse,
Reason: string(PolicyReasonPending),
Message: fmt.Sprintf("Waiting for WAF bundle; last fetch error: %s", errMsg),
Comment thread
ciarams87 marked this conversation as resolved.
}
}
Loading
Loading