Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 114 additions & 3 deletions pkg/openstack/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ const (
defaultProxyHostnameSuffix = "nip.io"
ServiceAnnotationLoadBalancerID = "loadbalancer.openstack.org/load-balancer-id"

// ServiceAnnotationLoadBalancerTags The lb tags annotation is used to set tags on the loadbalancer resource itself(support json list).
ServiceAnnotationLoadBalancerTags = "loadbalancer.openstack.org/load-balancer-tags"
// ServiceAnnotationListenerTags The listener tags annotation is used to set tags on the loadbalancer listener resources(support json list).
ServiceAnnotationListenerTags = "loadbalancer.openstack.org/listener-tags"
// ServiceAnnotationPoolTags The pool tags annotation is used to set tags on the loadbalancer pool resources(support json list).
ServiceAnnotationPoolTags = "loadbalancer.openstack.org/pool-tags"

// Octavia resources name formats
servicePrefix = "kube_service_"
lbFormat = "%s%s_%s_%s"
Expand Down Expand Up @@ -146,6 +153,9 @@ type serviceConfig struct {
healthMonitorMaxRetries int
healthMonitorMaxRetriesDown int
preferredIPFamily corev1.IPFamily // preferred (the first) IP family indicated in service's `spec.ipFamilies`
lbTags string
listenerTags string
poolTags string
}

type listenerKey struct {
Expand Down Expand Up @@ -197,6 +207,20 @@ func getLoadbalancerByName(ctx context.Context, client *gophercloud.ServiceClien
return &validLBs[0], nil
}

// mergeTags merges existedTags and desiredTags, returns true if all desiredTags are in existedTags.
func mergeTags(existedTags []string, desiredTags []string) (bool, []string) {
if len(existedTags) == 0 || existedTags == nil {
return false, desiredTags
}
desiredTagsSet := sets.NewString(desiredTags...)
tagSet := sets.NewString(existedTags...)
if tagSet.HasAll(desiredTags...) {
return true, nil
} else {
return false, tagSet.Union(desiredTagsSet).List()
}
}

func popListener(existingListeners []listeners.Listener, id string) []listeners.Listener {
newListeners := []listeners.Listener{}
for _, existingListener := range existingListeners {
Expand Down Expand Up @@ -235,7 +259,13 @@ func (lbaas *LbaasV2) createOctaviaLoadBalancer(ctx context.Context, name, clust
}

if svcConf.supportLBTags {
createOpts.Tags = []string{svcConf.lbName}
var desiredTags []string
if len(svcConf.lbTags) == 0 {
klog.V(4).Infof("No load balancer tags found from service annotation key: %s", ServiceAnnotationLoadBalancerTags)
} else if err := json.Unmarshal([]byte(svcConf.lbTags), &desiredTags); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like an idea to use json inside the annotation. Can you make it simpler? See an example

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, the json format could support more complex tags, and for LB tags we may don't need very complex tags, so I think your point is right, I'll change it.

klog.Warningf("unmarshal service annotation load balancer tags from key: %s, error: %s", ServiceAnnotationLoadBalancerTags, err)
}
createOpts.Tags = append([]string{svcConf.lbName}, desiredTags...)
}

if svcConf.flavorID != "" {
Expand Down Expand Up @@ -923,16 +953,43 @@ func (lbaas *LbaasV2) ensureOctaviaPool(ctx context.Context, lbID string, name s
}
}

var desiredTags []string
if len(svcConf.poolTags) == 0 {
klog.V(4).Infof("No pools tags found from service annotation key: %s", ServiceAnnotationPoolTags)
} else if err := json.Unmarshal([]byte(svcConf.poolTags), &desiredTags); err != nil {
klog.Warningf("unmarshal service annotation pools tags from key: %s, error: %s", ServiceAnnotationPoolTags, err)
}

if pool == nil {
createOpt := lbaas.buildPoolCreateOpt(listener.Protocol, service, svcConf, name)
createOpt.ListenerID = listener.ID

if svcConf.supportLBTags {
createOpt.Tags = desiredTags
}
klog.InfoS("Creating pool", "listenerID", listener.ID, "protocol", createOpt.Protocol)
klog.V(4).Infof("Pool create options: %+v", createOpt)
pool, err = openstackutil.CreatePool(ctx, lbaas.lb, createOpt, lbID)
if err != nil {
return nil, err
}
klog.V(2).Infof("Pool %s created for listener %s", pool.ID, listener.ID)
} else {
if svcConf.supportLBTags {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic feels a bit hard to follow with the nested if statements. Would it be possible to simplify or refactor it to improve readability? The same applies to other parts of new code.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with that, I'd like to do.

// Update tags if needed
if len(desiredTags) > 0 {
klog.V(4).Infof("Desired pools tags: %+v from service annotation key: %s", desiredTags, ServiceAnnotationPoolTags)
if ok, tags := mergeTags(pool.Tags, desiredTags); !ok {
klog.V(4).Infof("Will update pools' tags, current pools tags: %+v, desired tags: %+v", pool.Tags, tags)
updateOpts := v2pools.UpdateOpts{
Tags: &tags,
}
klog.InfoS("Updating pool tags", "poolID", pool.ID, "listenerID", listener.ID, "lbID", lbID, "tags", desiredTags)
if err := openstackutil.UpdatePool(ctx, lbaas.lb, lbID, pool.ID, updateOpts); err != nil {
klog.Warningf("Error updating LB pool tags: %v", err)
}
}
}
}
}

if lbaas.opts.ProviderRequiresSerialAPICalls {
Expand Down Expand Up @@ -1105,6 +1162,23 @@ func (lbaas *LbaasV2) ensureOctaviaListener(ctx context.Context, lbID string, na
newTags = append(newTags, svcConf.lbName)
updateOpts.Tags = &newTags
listenerChanged = true
} else {
// Get desired tags from Service annotations
var desiredTags []string
if len(svcConf.listenerTags) == 0 {
klog.V(4).Infof("No listeners tags found from service annotation key: %s", ServiceAnnotationListenerTags)
} else if err := json.Unmarshal([]byte(svcConf.listenerTags), &desiredTags); err != nil {
klog.Warningf("unmarshal service annotation listeners tags from key: %s, error: %s", ServiceAnnotationListenerTags, err)
}
// Ensure listeners tags match the desired tags from Service annotations
if len(desiredTags) > 0 {
klog.Infof("Desired listener tags: %+v from service annotation key: %s", desiredTags, ServiceAnnotationListenerTags)
if ok, tags := mergeTags(listener.Tags, desiredTags); !ok {
klog.V(4).Infof("Will update listeners' tags, current listeners tags: %+v, desired tags: %+v", listener.Tags, tags)
updateOpts.Tags = &tags
listenerChanged = true
}
}
}
}

Expand Down Expand Up @@ -1177,7 +1251,20 @@ func (lbaas *LbaasV2) buildListenerCreateOpt(ctx context.Context, port corev1.Se
}

if svcConf.supportLBTags {
listenerCreateOpt.Tags = []string{svcConf.lbName}
// Get desired tags from Service annotations
var desiredTags []string
if len(svcConf.listenerTags) == 0 {
klog.V(4).Infof("No listeners tags found from service annotation key: %s", ServiceAnnotationListenerTags)
} else if err := json.Unmarshal([]byte(svcConf.listenerTags), &desiredTags); err != nil {
klog.Warningf("unmarshal service annotation listeners tags from key: %s, error: %s", ServiceAnnotationListenerTags, err)
}
if len(desiredTags) > 0 {
klog.V(4).Infof("Desired listener tags: %+v from service annotation key: %s", desiredTags, ServiceAnnotationListenerTags)
if ok, tags := mergeTags([]string{svcConf.lbName}, desiredTags); !ok {
klog.V(4).Infof("Will update listeners' tags, current listeners tags: %s, desired tags: %+v", svcConf.lbName, tags)
listenerCreateOpt.Tags = tags
}
}
}

if openstackutil.IsOctaviaFeatureSupported(ctx, lbaas.lb, openstackutil.OctaviaFeatureTimeout, lbaas.opts.LBProvider) {
Expand Down Expand Up @@ -1365,6 +1452,11 @@ func (lbaas *LbaasV2) checkService(ctx context.Context, service *corev1.Service,
return fmt.Errorf("no service ports provided")
}

annotations := service.GetAnnotations()
svcConf.lbTags = annotations[ServiceAnnotationLoadBalancerTags]
svcConf.listenerTags = annotations[ServiceAnnotationListenerTags]
svcConf.poolTags = annotations[ServiceAnnotationPoolTags]

if len(service.Spec.IPFamilies) > 0 {
// Since OCCM does not support multiple load-balancers per service yet,
// the first IP family will determine the IP family of the load-balancer
Expand Down Expand Up @@ -1756,6 +1848,25 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName
// Make sure LB ID will be saved at this point.
lbaas.updateServiceAnnotation(service, ServiceAnnotationLoadBalancerID, loadbalancer.ID)

if svcConf.supportLBTags {
var desiredTags []string
if len(svcConf.lbTags) == 0 {
klog.V(4).Infof("No load balancer tags found from service annotation key: %s", ServiceAnnotationLoadBalancerTags)
} else if err := json.Unmarshal([]byte(svcConf.lbTags), &desiredTags); err != nil {
klog.Warningf("unmarshal service annotation load balancer tags from key: %s, error: %s", ServiceAnnotationLoadBalancerTags, err)
}
// add the service annotation tags to load balancer tags if the tags don't match
if len(desiredTags) > 0 {
klog.V(4).Infof("Desired load balancer tags: %v from service annotation: %v", desiredTags, ServiceAnnotationLoadBalancerTags)
if ok, tags := mergeTags(loadbalancer.Tags, desiredTags); !ok {
klog.Infof("Will update load balancer's tags, current load balancer tags: %+v, desired tags: %+v", loadbalancer.Tags, tags)
if err := openstackutil.UpdateLoadBalancerTags(ctx, lbaas.lb, loadbalancer.ID, tags); err != nil {
klog.Warningf("failed to update load balancer %s tags: %v", loadbalancer.ID, err)
}
}
}
}

if loadbalancer.ProvisioningStatus != activeStatus {
return nil, fmt.Errorf("load balancer %s is not ACTIVE, current provisioning status: %s", loadbalancer.ID, loadbalancer.ProvisioningStatus)
}
Expand Down