diff --git a/providers/gce/gce.go b/providers/gce/gce.go index 0b7ae1d79..7a91de23d 100644 --- a/providers/gce/gce.go +++ b/providers/gce/gce.go @@ -176,8 +176,9 @@ type Cloud struct { // sharedResourceLock is used to serialize GCE operations that may mutate shared state to // prevent inconsistencies. For example, load balancers manipulation methods will take the // lock to prevent shared resources from being prematurely deleted while the operation is - // in progress. sharedResourceLock sync.Mutex + // sharedResourceLocks is a concurrent map used for resource-specific fine-grained locking of shared resources (e.g. InstanceGroups, shared HealthChecks). + sharedResourceLocks sync.Map // map[string]*sync.Mutex // AlphaFeatureGate gates gce alpha features in Cloud instance. // Related wrapper functions that interacts with gce alpha api should examine whether // the corresponding api is enabled. @@ -219,6 +220,30 @@ type Cloud struct { enableL4DenyFirewallRollbackCleanup bool } +type SharedResourceType string + +const ( + ResourceTypeHealthCheck SharedResourceType = "hc" + ResourceTypeInstanceGroup SharedResourceType = "ig" + ResourceTypeFirewall SharedResourceType = "fw" +) + +func (g *Cloud) getLockForResource(resType SharedResourceType, name string) *sync.Mutex { + key := string(resType) + ":" + name + v, _ := g.sharedResourceLocks.LoadOrStore(key, &sync.Mutex{}) + return v.(*sync.Mutex) +} + +// lockResourceIfShared conditionally acquires a lock and returns a func to defer for unlocking. +func (g *Cloud) lockResourceIfShared(shared bool, resType SharedResourceType, name string) func() { + if !shared { + return func() {} // No-op + } + lock := g.getLockForResource(resType, name) + lock.Lock() + return lock.Unlock +} + // ConfigGlobal is the in memory representation of the gce.conf config data // TODO: replace gcfg with json type ConfigGlobal struct { diff --git a/providers/gce/gce_loadbalancer_internal.go b/providers/gce/gce_loadbalancer_internal.go index 21eab518b..e977234c1 100644 --- a/providers/gce/gce_loadbalancer_internal.go +++ b/providers/gce/gce_loadbalancer_internal.go @@ -128,10 +128,6 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v } } - // Lock the sharedResourceLock to prevent any deletions of shared resources while assembling shared resources here - g.sharedResourceLock.Lock() - defer g.sharedResourceLock.Unlock() - // Ensure health check exists before creating the backend service. The health check is shared // if externalTrafficPolicy=Cluster. sharedHealthCheck := !servicehelpers.RequestsOnlyLocalTraffic(svc) @@ -354,9 +350,6 @@ func (g *Cloud) updateInternalLoadBalancer(clusterName, clusterID string, svc *v klog.V(2).Infof("Skipped updateInternalLoadBalancer for service %s/%s as service contains %q loadBalancerClass.", svc.Namespace, svc.Name, *svc.Spec.LoadBalancerClass) return cloudprovider.ImplementedElsewhere } - g.sharedResourceLock.Lock() - defer g.sharedResourceLock.Unlock() - igName := makeInstanceGroupName(clusterID) igLinks, err := g.ensureInternalInstanceGroups(igName, nodes) if err != nil { @@ -393,9 +386,6 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string, sharedBackend := shareBackendService(svc) sharedHealthCheck := !servicehelpers.RequestsOnlyLocalTraffic(svc) - g.sharedResourceLock.Lock() - defer g.sharedResourceLock.Unlock() - klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): attempting delete of region internal address", loadBalancerName) ensureAddressDeleted(g, loadBalancerName, g.region) @@ -500,7 +490,9 @@ func (g *Cloud) teardownInternalHealthCheckAndFirewall(svc *v1.Service, hcName s return nil } -func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string) error { +func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string, shared bool) error { + defer g.lockResourceIfShared(shared, ResourceTypeFirewall, fwName)() + klog.V(2).Infof("ensureInternalFirewall(%v): checking existing firewall", fwName) targetTags, err := g.GetNodeTags(nodeNames(nodes)) if err != nil { @@ -587,7 +579,7 @@ func (g *Cloud) ensureInternalFirewalls(loadBalancerName, ipAddress, clusterID s return err } - err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName) + err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName, false) if err != nil { return err } @@ -595,7 +587,7 @@ func (g *Cloud) ensureInternalFirewalls(loadBalancerName, ipAddress, clusterID s // Second firewall is for health checking nodes / services fwHCName := makeHealthCheckFirewallName(loadBalancerName, clusterID, sharedHealthCheck) hcSrcRanges := L4LoadBalancerSrcRanges() - return g.ensureInternalFirewall(svc, fwHCName, "", "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes, "") + return g.ensureInternalFirewall(svc, fwHCName, "", "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes, "", sharedHealthCheck) } func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32) (*compute.HealthCheck, error) { @@ -608,17 +600,34 @@ func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedN } if hc == nil { - klog.V(2).Infof("ensureInternalHealthCheck: did not find health check %v, creating one with port %v path %v", name, port, path) - if err = g.CreateHealthCheck(expectedHC); err != nil { - return nil, err - } - hc, err = g.GetHealthCheck(name) + var created bool + err = func() error { + defer g.lockResourceIfShared(shared, ResourceTypeHealthCheck, name)() + + hc, err = g.GetHealthCheck(name) + if err != nil && !isNotFound(err) { + return err + } + + if hc == nil { + klog.V(2).Infof("ensureInternalHealthCheck: did not find health check %v, creating one with port %v path %v", name, port, path) + if err = g.CreateHealthCheck(expectedHC); err != nil { + return err + } + hc, err = g.GetHealthCheck(name) + if err != nil { + return err + } + created = true + } + return nil + }() if err != nil { - klog.Errorf("Failed to get http health check %v", err) return nil, err } - klog.V(2).Infof("ensureInternalHealthCheck: created health check %v", name) - return hc, nil + if created { + return hc, nil + } } if needToUpdateHealthChecks(hc, expectedHC) { @@ -638,6 +647,10 @@ func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedN } func (g *Cloud) ensureInternalInstanceGroup(name, zone string, nodes []*v1.Node, emptyZoneNodes []*v1.Node) (string, error) { + lock := g.getLockForResource(ResourceTypeInstanceGroup, name+"-"+zone) + lock.Lock() + defer lock.Unlock() + klog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): checking group that it contains %v nodes [node names limited, total number of nodes: %d], the following nodes have empty string in the zone field and won't be deleted: %v", name, zone, loggableNodeNames(nodes), len(nodes), loggableNodeNames(emptyZoneNodes)) ig, err := g.GetInstanceGroup(name, zone) if err != nil && !isNotFound(err) { diff --git a/providers/gce/gce_loadbalancer_internal_test.go b/providers/gce/gce_loadbalancer_internal_test.go index 155542509..103a08d8d 100644 --- a/providers/gce/gce_loadbalancer_internal_test.go +++ b/providers/gce/gce_loadbalancer_internal_test.go @@ -22,18 +22,23 @@ package gce import ( "context" "fmt" + "net/http" "reflect" "sort" "strings" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/mock" "google.golang.org/api/compute/v1" + "google.golang.org/api/googleapi" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -974,7 +979,7 @@ func TestEnsureInternalFirewallDeletesLegacyFirewall(t *testing.T) { []string{"123"}, v1.ProtocolTCP, nodes, - "") + "", false) if err != nil { t.Errorf("Unexpected error %v when ensuring legacy firewall %s for svc %+v", err, lbName, svc) } @@ -989,7 +994,7 @@ func TestEnsureInternalFirewallDeletesLegacyFirewall(t *testing.T) { []string{"123", "456"}, v1.ProtocolTCP, nodes, - lbName) + lbName, false) if err != nil { t.Errorf("Unexpected error %v when ensuring firewall %s for svc %+v", err, fwName, svc) } @@ -1012,7 +1017,7 @@ func TestEnsureInternalFirewallDeletesLegacyFirewall(t *testing.T) { []string{"123", "456", "789"}, v1.ProtocolTCP, nodes, - lbName) + lbName, false) if err != nil { t.Errorf("Unexpected error %v when ensuring firewall %s for svc %+v", err, fwName, svc) } @@ -1058,7 +1063,7 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) { []string{"123"}, v1.ProtocolTCP, nodes, - lbName) + lbName, false) require.Nil(t, err, "Should success when XPN is on.") checkEvent(t, recorder, FirewallChangeMsg, true) @@ -1077,7 +1082,7 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) { []string{"123"}, v1.ProtocolTCP, nodes, - lbName) + lbName, false) require.NoError(t, err) existingFirewall, err := gce.GetFirewall(fwName) require.NoError(t, err) @@ -1097,7 +1102,7 @@ func TestEnsureInternalFirewallSucceedsOnXPN(t *testing.T) { []string{"123"}, v1.ProtocolTCP, nodes, - lbName) + lbName, false) require.Nil(t, err, "Should success when XPN is on.") checkEvent(t, recorder, FirewallChangeMsg, true) @@ -2013,7 +2018,7 @@ func TestEnsureInternalFirewallPortRanges(t *testing.T) { getPortRanges(tc.Input), v1.ProtocolTCP, nodes, - "") + "", false) if err != nil { t.Errorf("Unexpected error %v when ensuring legacy firewall %s for svc %+v", err, lbName, svc) } @@ -2050,7 +2055,7 @@ func TestEnsureInternalFirewallDestinations(t *testing.T) { []string{"8080"}, v1.ProtocolTCP, nodes, - "") + "", false) if err != nil { t.Errorf("Unexpected error %v when ensuring firewall %s for svc %+v", err, fwName, svc) } @@ -2070,7 +2075,7 @@ func TestEnsureInternalFirewallDestinations(t *testing.T) { []string{"8080"}, v1.ProtocolTCP, nodes, - "") + "", false) if err != nil { t.Errorf("Unexpected error %v when ensuring firewall %s for svc %+v", err, fwName, svc) } @@ -2502,3 +2507,204 @@ func TestEnsureInternalLoadBalancerClass(t *testing.T) { } } } + +func TestEnsureInternalBackendServiceConflict(t *testing.T) { + t.Parallel() + + vals := DefaultTestClusterValues() + nodeNames := []string{"test-node-1"} + + gce, err := fakeGCECloud(vals) + require.NoError(t, err) + + svc := fakeLoadbalancerService(string(LBTypeInternal)) + lbName := gce.GetLoadBalancerName(context.TODO(), "", svc) + nodes, err := createAndInsertNodes(gce, nodeNames, vals.ZoneName) + require.NoError(t, err) + igName := makeInstanceGroupName(vals.ClusterID) + igLinks, err := gce.ensureInternalInstanceGroups(igName, nodes) + require.NoError(t, err) + + sharedBackend := shareBackendService(svc) + bsName := makeBackendServiceName(lbName, vals.ClusterID, sharedBackend, cloud.SchemeInternal, "TCP", svc.Spec.SessionAffinity) + + // Create backend initially + err = gce.ensureInternalBackendService(bsName, "description", svc.Spec.SessionAffinity, cloud.SchemeInternal, "TCP", igLinks, "") + require.NoError(t, err) + + // Mock 412 error + c := gce.c.(*cloud.MockGCE) + c.MockRegionBackendServices.UpdateHook = func(ctx context.Context, key *meta.Key, obj *compute.BackendService, m *cloud.MockRegionBackendServices, options ...cloud.Option) error { + return &googleapi.Error{Code: http.StatusPreconditionFailed, Message: "Precondition Failed"} + } + + // Update the Backend Service to trigger the update hook + err = gce.ensureInternalBackendService(bsName, "description", v1.ServiceAffinityNone, cloud.SchemeInternal, "TCP", igLinks, "") + + // Verify that the error is propagated + require.Error(t, err) + assert.Contains(t, err.Error(), "Precondition Failed") + assert.IsType(t, &googleapi.Error{}, err) + if gErr, ok := err.(*googleapi.Error); ok { + assert.Equal(t, http.StatusPreconditionFailed, gErr.Code) + } +} + +func TestResourceLockErrorRecovery(t *testing.T) { + t.Parallel() + vals := DefaultTestClusterValues() + gce, _ := fakeGCECloud(vals) + c := gce.c.(*cloud.MockGCE) + + svc := fakeLoadbalancerService(string(LBTypeInternal)) + svcName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} + + var calls int32 + c.MockHealthChecks.InsertHook = func(ctx context.Context, key *meta.Key, obj *compute.HealthCheck, m *cloud.MockHealthChecks, options ...cloud.Option) (bool, error) { + if atomic.AddInt32(&calls, 1) == 1 { + return true, &googleapi.Error{Code: http.StatusInternalServerError, Message: "Simulated GCP Error"} + } + return false, nil + } + + // 1st request should error out and release lock + _, err := gce.ensureInternalHealthCheck("hc-lock-test", svcName, true, "/", 80) + require.Error(t, err) + assert.Contains(t, err.Error(), "Simulated GCP Error") + + // 2nd request should successfully acquire the lock, create the health check, and succeed. + // We use a channel to ensure that if the lock was leaked, the test fails quickly instead of timing out. + errCh := make(chan error, 1) + hcCh := make(chan *compute.HealthCheck, 1) + go func() { + hc, err := gce.ensureInternalHealthCheck("hc-lock-test", svcName, true, "/", 80) + errCh <- err + hcCh <- hc + }() + + select { + case err := <-errCh: + hc := <-hcCh + require.NoError(t, err) + assert.NotNil(t, hc) + case <-time.After(2 * time.Second): + t.Fatal("Deadlock detected: Second request timed out trying to acquire lock. The lock was likely leaked.") + } + + assert.Equal(t, int32(2), atomic.LoadInt32(&calls)) +} + +func TestEnsureInternalInstanceGroupNodeSyncScaling(t *testing.T) { + t.Parallel() + vals := DefaultTestClusterValues() + gce, _ := fakeGCECloud(vals) + c := gce.c.(*cloud.MockGCE) + + igName := "test-ig-node-scale" + zone := vals.ZoneName + + // Inject a small sleep in Get and Insert to widen the race window. + c.MockInstanceGroups.GetHook = func(ctx context.Context, key *meta.Key, m *cloud.MockInstanceGroups, options ...cloud.Option) (bool, *compute.InstanceGroup, error) { + time.Sleep(2 * time.Millisecond) + return false, nil, nil + } + c.MockInstanceGroups.InsertHook = func(ctx context.Context, key *meta.Key, obj *compute.InstanceGroup, m *cloud.MockInstanceGroups, options ...cloud.Option) (bool, error) { + time.Sleep(2 * time.Millisecond) + return false, nil + } + + var eg errgroup.Group + workers := 20 + + for i := 0; i < workers; i++ { + workerID := i + eg.Go(func() error { + var nodes []*v1.Node + for j := 0; j < (workerID%5)+1; j++ { + nodeName := fmt.Sprintf("node-%d", j) + nodes = append(nodes, &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: nodeName}, + }) + } + + _, err := gce.ensureInternalInstanceGroup(igName, zone, nodes, nil) + return err + }) + } + + err := eg.Wait() + require.NoError(t, err, "All workers should complete without error") + + // We verify that the final state precisely matches one of the expected valid subsets. + instances, err := gce.ListInstancesInInstanceGroup(igName, zone, "ALL") + require.NoError(t, err) + + actualNodes := make(map[string]bool) + for _, ins := range instances { + parts := strings.Split(ins.Instance, "/") + actualNodes[parts[len(parts)-1]] = true + } + + validStates := []map[string]bool{ + {"node-0": true}, + {"node-0": true, "node-1": true}, + {"node-0": true, "node-1": true, "node-2": true}, + {"node-0": true, "node-1": true, "node-2": true, "node-3": true}, + {"node-0": true, "node-1": true, "node-2": true, "node-3": true, "node-4": true}, + } + + isValid := false + for _, state := range validStates { + if reflect.DeepEqual(actualNodes, state) { + isValid = true + break + } + } + assert.True(t, isValid, "Final InstanceGroup count should precisely match exactly one of the known synchronized states, got: %v", actualNodes) +} + +func TestSharedVsNonSharedHealthCheckContention(t *testing.T) { + t.Parallel() + vals := DefaultTestClusterValues() + gce, _ := fakeGCECloud(vals) + c := gce.c.(*cloud.MockGCE) + + svc := fakeLoadbalancerService(string(LBTypeInternal)) + svcName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} + + var sharedInsertCount int32 + + c.MockHealthChecks.InsertHook = func(ctx context.Context, key *meta.Key, obj *compute.HealthCheck, m *cloud.MockHealthChecks, options ...cloud.Option) (bool, error) { + time.Sleep(5 * time.Millisecond) + if obj.Name == "shared-hc" { + atomic.AddInt32(&sharedInsertCount, 1) + } + return false, nil + } + c.MockHealthChecks.GetHook = func(ctx context.Context, key *meta.Key, m *cloud.MockHealthChecks, options ...cloud.Option) (bool, *compute.HealthCheck, error) { + time.Sleep(5 * time.Millisecond) + return false, nil, nil + } + + var eg errgroup.Group + workers := 50 + + for i := 0; i < workers; i++ { + workerID := i + eg.Go(func() error { + if workerID%2 == 0 { + _, err := gce.ensureInternalHealthCheck("shared-hc", svcName, true, "/", 80) + return err + } else { + hcName := fmt.Sprintf("unique-hc-%d", workerID) + _, err := gce.ensureInternalHealthCheck(hcName, svcName, false, "/", 80) + return err + } + }) + } + + err := eg.Wait() + require.NoError(t, err, "All health check routines should complete without error") + + assert.Equal(t, int32(1), atomic.LoadInt32(&sharedInsertCount), "Shared health check should only be inserted exactly once") +} diff --git a/providers/gce/gce_test.go b/providers/gce/gce_test.go index 2fa1b6e3d..a8e493d30 100644 --- a/providers/gce/gce_test.go +++ b/providers/gce/gce_test.go @@ -638,3 +638,36 @@ func TestGetProjectsBasePath(t *testing.T) { } } } + +func TestSharedResourceLocksScoping(t *testing.T) { + vals := DefaultTestClusterValues() + gce := NewFakeGCECloud(vals) + + // Use the exact same resource name across different ResourceTypes + // to check that the namespace isolation prevents collisions. + const sharedName = "collide-test" + + unlockHC := gce.lockResourceIfShared(true, ResourceTypeHealthCheck, sharedName) + unlockHC() + + unlockIG := gce.lockResourceIfShared(true, ResourceTypeInstanceGroup, sharedName) + unlockIG() + + var keys []string + gce.sharedResourceLocks.Range(func(key, value any) bool { + keys = append(keys, key.(string)) + return true + }) + + // This check proves that namespace scoping prevented a key collision. + if len(keys) != 2 { + t.Fatalf("Expected exactly 2 locks (scoping failed to prevent collision), got %d", len(keys)) + } + + for _, k := range keys { + // Validating the internal prefix grammar. + if !strings.HasPrefix(k, string(ResourceTypeHealthCheck)+":") && !strings.HasPrefix(k, string(ResourceTypeInstanceGroup)+":") { + t.Errorf("Unexpected lock scoped in sharedResourceLocks: %s", k) + } + } +}