Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 109 additions & 8 deletions pkg/openstack/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ const (
defaultProxyHostnameSuffix = "nip.io"
ServiceAnnotationLoadBalancerID = "loadbalancer.openstack.org/load-balancer-id"

// ServiceAnnotationLoadBalancerTags The lb tags annotation is used to set tags on the loadbalancer resource itself(support json list).
ServiceAnnotationLoadBalancerTags = "loadbalancer.openstack.org/load-balancer-tags"
// ServiceAnnotationListenerTags The listener tags annotation is used to set tags on the loadbalancer listener resources(support json list).
ServiceAnnotationListenerTags = "loadbalancer.openstack.org/listener-tags"
// ServiceAnnotationPoolTags The pool tags annotation is used to set tags on the loadbalancer pool resources(support json list).
ServiceAnnotationPoolTags = "loadbalancer.openstack.org/pool-tags"

// Octavia resources name formats
servicePrefix = "kube_service_"
lbFormat = "%s%s_%s_%s"
Expand Down Expand Up @@ -146,6 +153,9 @@ type serviceConfig struct {
healthMonitorMaxRetries int
healthMonitorMaxRetriesDown int
preferredIPFamily corev1.IPFamily // preferred (the first) IP family indicated in service's `spec.ipFamilies`
lbTags string
listenerTags string
poolTags string
}

type listenerKey struct {
Expand Down Expand Up @@ -197,6 +207,20 @@ func getLoadbalancerByName(ctx context.Context, client *gophercloud.ServiceClien
return &validLBs[0], nil
}

// mergeTags merges existedTags and desiredTags, returns true if all desiredTags are in existedTags.
func mergeTags(existedTags []string, desiredTags []string) (bool, []string) {
if len(existedTags) == 0 || existedTags == nil {
return false, desiredTags
}
desiredTagsSet := sets.NewString(desiredTags...)
tagSet := sets.NewString(existedTags...)
if tagSet.HasAll(desiredTags...) {
return true, nil
} else {
return false, tagSet.Union(desiredTagsSet).List()
}
}

func popListener(existingListeners []listeners.Listener, id string) []listeners.Listener {
newListeners := []listeners.Listener{}
for _, existingListener := range existingListeners {
Expand Down Expand Up @@ -235,7 +259,13 @@ func (lbaas *LbaasV2) createOctaviaLoadBalancer(ctx context.Context, name, clust
}

if svcConf.supportLBTags {
createOpts.Tags = []string{svcConf.lbName}
var desiredTags []string
if len(svcConf.lbTags) == 0 {
klog.V(4).Infof("No load balancer tags found from service annotation key: %s", ServiceAnnotationLoadBalancerTags)
} else {
desiredTags = cpoutil.SplitTrim(svcConf.lbTags, ',')
}
createOpts.Tags = append([]string{svcConf.lbName}, desiredTags...)
}

if svcConf.flavorID != "" {
Expand Down Expand Up @@ -923,16 +953,41 @@ func (lbaas *LbaasV2) ensureOctaviaPool(ctx context.Context, lbID string, name s
}
}

var desiredTags []string
if len(svcConf.poolTags) == 0 {
klog.V(4).Infof("No pools tags found from service annotation key: %s", ServiceAnnotationPoolTags)
} else {
desiredTags = cpoutil.SplitTrim(svcConf.poolTags, ',')
}

if pool == nil {
createOpt := lbaas.buildPoolCreateOpt(listener.Protocol, service, svcConf, name)
createOpt.ListenerID = listener.ID

if svcConf.supportLBTags {
createOpt.Tags = desiredTags
}
klog.InfoS("Creating pool", "listenerID", listener.ID, "protocol", createOpt.Protocol)
klog.V(4).Infof("Pool create options: %+v", createOpt)
pool, err = openstackutil.CreatePool(ctx, lbaas.lb, createOpt, lbID)
if err != nil {
return nil, err
}
klog.V(2).Infof("Pool %s created for listener %s", pool.ID, listener.ID)
} else if svcConf.supportLBTags {
// Update tags if needed
if len(desiredTags) > 0 {
klog.V(4).Infof("Desired pools tags: %+v from service annotation key: %s", desiredTags, ServiceAnnotationPoolTags)
if ok, tags := mergeTags(pool.Tags, desiredTags); !ok {
klog.V(4).Infof("Will update pools' tags, current pools tags: %+v, desired tags: %+v", pool.Tags, tags)
updateOpts := v2pools.UpdateOpts{
Tags: &tags,
}
klog.InfoS("Updating pool tags", "poolID", pool.ID, "listenerID", listener.ID, "lbID", lbID, "tags", desiredTags)
if err := openstackutil.UpdatePool(ctx, lbaas.lb, lbID, pool.ID, updateOpts); err != nil {
klog.Warningf("Error updating LB pool tags: %v", err)
}
}
}
}

if lbaas.opts.ProviderRequiresSerialAPICalls {
Expand Down Expand Up @@ -1099,11 +1154,18 @@ func (lbaas *LbaasV2) ensureOctaviaListener(ctx context.Context, lbID string, na
updateOpts := listeners.UpdateOpts{}

if svcConf.supportLBTags {
if !slices.Contains(listener.Tags, svcConf.lbName) {
var newTags []string
copy(newTags, listener.Tags)
newTags = append(newTags, svcConf.lbName)
updateOpts.Tags = &newTags
// Get desired tags from Service annotations
var desiredTags []string
if len(svcConf.listenerTags) == 0 {
klog.V(4).Infof("No listeners tags found from service annotation key: %s", ServiceAnnotationListenerTags)
} else {
desiredTags = cpoutil.SplitTrim(svcConf.listenerTags, ',')
}

tagsToEnsure := append([]string{svcConf.lbName}, desiredTags...)
if ok, tags := mergeTags(listener.Tags, tagsToEnsure); !ok {
klog.V(4).Infof("Will update listeners' tags, current listeners tags: %+v, desired tags: %+v", listener.Tags, tags)
updateOpts.Tags = &tags
listenerChanged = true
}
}
Expand Down Expand Up @@ -1177,7 +1239,22 @@ func (lbaas *LbaasV2) buildListenerCreateOpt(ctx context.Context, port corev1.Se
}

if svcConf.supportLBTags {
listenerCreateOpt.Tags = []string{svcConf.lbName}
// Get desired tags from Service annotations
var desiredTags []string
if len(svcConf.listenerTags) == 0 {
klog.V(4).Infof("No listeners tags found from service annotation key: %s", ServiceAnnotationListenerTags)
} else {
desiredTags = cpoutil.SplitTrim(svcConf.listenerTags, ',')
}

tags := []string{svcConf.lbName}
if len(desiredTags) > 0 {
klog.V(4).Infof("Desired listener tags: %+v from service annotation key: %s", desiredTags, ServiceAnnotationListenerTags)
if ok, merged := mergeTags(tags, desiredTags); !ok {
tags = merged
}
}
listenerCreateOpt.Tags = tags
}

if openstackutil.IsOctaviaFeatureSupported(ctx, lbaas.lb, openstackutil.OctaviaFeatureTimeout, lbaas.opts.LBProvider) {
Expand Down Expand Up @@ -1365,6 +1442,11 @@ func (lbaas *LbaasV2) checkService(ctx context.Context, service *corev1.Service,
return fmt.Errorf("no service ports provided")
}

annotations := service.GetAnnotations()
svcConf.lbTags = annotations[ServiceAnnotationLoadBalancerTags]
svcConf.listenerTags = annotations[ServiceAnnotationListenerTags]
svcConf.poolTags = annotations[ServiceAnnotationPoolTags]

if len(service.Spec.IPFamilies) > 0 {
// Since OCCM does not support multiple load-balancers per service yet,
// the first IP family will determine the IP family of the load-balancer
Expand Down Expand Up @@ -1756,6 +1838,25 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName
// Make sure LB ID will be saved at this point.
lbaas.updateServiceAnnotation(service, ServiceAnnotationLoadBalancerID, loadbalancer.ID)

if svcConf.supportLBTags {
var desiredTags []string
if len(svcConf.lbTags) == 0 {
klog.V(4).Infof("No load balancer tags found from service annotation key: %s", ServiceAnnotationLoadBalancerTags)
} else {
desiredTags = cpoutil.SplitTrim(svcConf.lbTags, ',')
}
// add the service annotation tags to load balancer tags if the tags don't match
if len(desiredTags) > 0 {
klog.V(4).Infof("Desired load balancer tags: %v from service annotation: %v", desiredTags, ServiceAnnotationLoadBalancerTags)
if ok, tags := mergeTags(loadbalancer.Tags, desiredTags); !ok {
klog.Infof("Will update load balancer's tags, current load balancer tags: %+v, desired tags: %+v", loadbalancer.Tags, tags)
if err := openstackutil.UpdateLoadBalancerTags(ctx, lbaas.lb, loadbalancer.ID, tags); err != nil {
klog.Warningf("failed to update load balancer %s tags: %v", loadbalancer.ID, err)
}
}
}
}

if loadbalancer.ProvisioningStatus != activeStatus {
return nil, fmt.Errorf("load balancer %s is not ACTIVE, current provisioning status: %s", loadbalancer.ID, loadbalancer.ProvisioningStatus)
}
Expand Down