Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion providers/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ type Cloud struct {
// sharedResourceLock is used to serialize GCE operations that may mutate shared state to
// prevent inconsistencies. For example, load balancers manipulation methods will take the
// lock to prevent shared resources from being prematurely deleted while the operation is
// in progress.
sharedResourceLock sync.Mutex
// sharedResourceLocks is a concurrent map used for resource-specific fine-grained locking of shared resources (e.g. InstanceGroups, shared HealthChecks).
sharedResourceLocks sync.Map // map[string]*sync.Mutex
// AlphaFeatureGate gates gce alpha features in Cloud instance.
// Related wrapper functions that interacts with gce alpha api should examine whether
// the corresponding api is enabled.
Expand Down Expand Up @@ -219,6 +220,30 @@ type Cloud struct {
enableL4DenyFirewallRollbackCleanup bool
}

type SharedResourceType string

const (
ResourceTypeHealthCheck SharedResourceType = "hc"
ResourceTypeInstanceGroup SharedResourceType = "ig"
ResourceTypeFirewall SharedResourceType = "fw"
)

func (g *Cloud) getLockForResource(resType SharedResourceType, name string) *sync.Mutex {
key := string(resType) + ":" + name
v, _ := g.sharedResourceLocks.LoadOrStore(key, &sync.Mutex{})
return v.(*sync.Mutex)
}

// lockResourceIfShared conditionally acquires a lock and returns a func to defer for unlocking.
func (g *Cloud) lockResourceIfShared(shared bool, resType SharedResourceType, name string) func() {
if !shared {
return func() {} // No-op
}
lock := g.getLockForResource(resType, name)
lock.Lock()
return lock.Unlock
}

// ConfigGlobal is the in memory representation of the gce.conf config data
// TODO: replace gcfg with json
type ConfigGlobal struct {
Expand Down
55 changes: 34 additions & 21 deletions providers/gce/gce_loadbalancer_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v
}
}

// Lock the sharedResourceLock to prevent any deletions of shared resources while assembling shared resources here
g.sharedResourceLock.Lock()
defer g.sharedResourceLock.Unlock()

// Ensure health check exists before creating the backend service. The health check is shared
// if externalTrafficPolicy=Cluster.
sharedHealthCheck := !servicehelpers.RequestsOnlyLocalTraffic(svc)
Expand Down Expand Up @@ -354,9 +350,6 @@ func (g *Cloud) updateInternalLoadBalancer(clusterName, clusterID string, svc *v
klog.V(2).Infof("Skipped updateInternalLoadBalancer for service %s/%s as service contains %q loadBalancerClass.", svc.Namespace, svc.Name, *svc.Spec.LoadBalancerClass)
return cloudprovider.ImplementedElsewhere
}
g.sharedResourceLock.Lock()
defer g.sharedResourceLock.Unlock()

igName := makeInstanceGroupName(clusterID)
igLinks, err := g.ensureInternalInstanceGroups(igName, nodes)
if err != nil {
Expand Down Expand Up @@ -393,9 +386,6 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string,
sharedBackend := shareBackendService(svc)
sharedHealthCheck := !servicehelpers.RequestsOnlyLocalTraffic(svc)

g.sharedResourceLock.Lock()
defer g.sharedResourceLock.Unlock()

klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): attempting delete of region internal address", loadBalancerName)
ensureAddressDeleted(g, loadBalancerName, g.region)

Expand Down Expand Up @@ -500,7 +490,9 @@ func (g *Cloud) teardownInternalHealthCheckAndFirewall(svc *v1.Service, hcName s
return nil
}

func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string) error {
func (g *Cloud) ensureInternalFirewall(svc *v1.Service, fwName, fwDesc, destinationIP string, sourceRanges []string, portRanges []string, protocol v1.Protocol, nodes []*v1.Node, legacyFwName string, shared bool) error {
defer g.lockResourceIfShared(shared, ResourceTypeFirewall, fwName)()

klog.V(2).Infof("ensureInternalFirewall(%v): checking existing firewall", fwName)
targetTags, err := g.GetNodeTags(nodeNames(nodes))
if err != nil {
Expand Down Expand Up @@ -587,15 +579,15 @@ func (g *Cloud) ensureInternalFirewalls(loadBalancerName, ipAddress, clusterID s
return err
}

err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName)
err = g.ensureInternalFirewall(svc, MakeFirewallName(loadBalancerName), fwDesc, ipAddress, sourceRanges.StringSlice(), portRanges, protocol, nodes, loadBalancerName, false)
if err != nil {
return err
}

// Second firewall is for health checking nodes / services
fwHCName := makeHealthCheckFirewallName(loadBalancerName, clusterID, sharedHealthCheck)
hcSrcRanges := L4LoadBalancerSrcRanges()
return g.ensureInternalFirewall(svc, fwHCName, "", "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes, "")
return g.ensureInternalFirewall(svc, fwHCName, "", "", hcSrcRanges, []string{healthCheckPort}, v1.ProtocolTCP, nodes, "", sharedHealthCheck)
}

func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32) (*compute.HealthCheck, error) {
Expand All @@ -608,17 +600,34 @@ func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedN
}

if hc == nil {
klog.V(2).Infof("ensureInternalHealthCheck: did not find health check %v, creating one with port %v path %v", name, port, path)
if err = g.CreateHealthCheck(expectedHC); err != nil {
return nil, err
}
hc, err = g.GetHealthCheck(name)
var created bool
err = func() error {
defer g.lockResourceIfShared(shared, ResourceTypeHealthCheck, name)()

hc, err = g.GetHealthCheck(name)
if err != nil && !isNotFound(err) {
return err
}

if hc == nil {
klog.V(2).Infof("ensureInternalHealthCheck: did not find health check %v, creating one with port %v path %v", name, port, path)
if err = g.CreateHealthCheck(expectedHC); err != nil {
return err
}
hc, err = g.GetHealthCheck(name)
if err != nil {
return err
}
created = true
}
return nil
}()
if err != nil {
klog.Errorf("Failed to get http health check %v", err)
return nil, err
}
klog.V(2).Infof("ensureInternalHealthCheck: created health check %v", name)
return hc, nil
if created {
return hc, nil
}
}

if needToUpdateHealthChecks(hc, expectedHC) {
Expand All @@ -638,6 +647,10 @@ func (g *Cloud) ensureInternalHealthCheck(name string, svcName types.NamespacedN
}

func (g *Cloud) ensureInternalInstanceGroup(name, zone string, nodes []*v1.Node, emptyZoneNodes []*v1.Node) (string, error) {
lock := g.getLockForResource(ResourceTypeInstanceGroup, name+"-"+zone)
lock.Lock()
defer lock.Unlock()

klog.V(2).Infof("ensureInternalInstanceGroup(%v, %v): checking group that it contains %v nodes [node names limited, total number of nodes: %d], the following nodes have empty string in the zone field and won't be deleted: %v", name, zone, loggableNodeNames(nodes), len(nodes), loggableNodeNames(emptyZoneNodes))
ig, err := g.GetInstanceGroup(name, zone)
if err != nil && !isNotFound(err) {
Expand Down
Loading