diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index a67114070b..51b9939da5 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -18,6 +18,10 @@ Unreleased
* nothing unreleased
+[8.0.5] - 2026-05-04
+---------------------
+* feat: add TPA pipeline step and social auth disconnect handler
+
[8.0.4] - 2026-05-05
---------------------
* feat: extend manage enterprise customer admins permission to the enterprise_openedx_operator role
diff --git a/Makefile b/Makefile
index 1571d416f9..a7779d5811 100644
--- a/Makefile
+++ b/Makefile
@@ -216,14 +216,26 @@ test-shell-logs: dev.logs
test-shell-restart-container: dev.restart-container
test-shell-attach: dev.attach
-dev.up.keycloak:
+dev.up.keycloak: ## start the Keycloak container for SAML IdP testing
docker compose up --detach keycloak
-dev.stop.keycloak:
+dev.stop.keycloak: ## stop the Keycloak container
docker compose stop keycloak
+dev.provision.keycloak: ## provision Keycloak SAML IdP and LMS TPA records for testing
+ docker compose run --rm keycloak-config-cli
+ docker exec -i --env-file keycloak-devstack.env edx.devstack.lms \
+ python manage.py lms --settings devstack shell < provision-tpa.py
+ @echo ""
+ @echo "NOTE: Your browser machine needs this /etc/hosts entry for SAML login to work:"
+ @echo " 127.0.0.1 edx.devstack.keycloak"
+ @echo ""
+ @echo "If you are using a remote codespace, update /etc/hosts on your local machine"
+ @echo "(where VS Code / the browser runs), not inside the codespace."
+ @echo ""
+
.PHONY: clean clean.static compile_translations coverage docs dummy_translations extract_translations \
fake_translations help pull_translations push_translations requirements dev_requirements test upgrade validate isort \
isort-check static static.dev static.watch quality pylint pycodestyle pii_check pii_clean jasmine \
dev.pull dev.up dev.down dev.stop dev.makemigrations dev.shell dev.logs dev.restart-container dev.attach \
- dev.up.keycloak dev.stop.keycloak
+ dev.up.keycloak dev.stop.keycloak dev.provision.keycloak
diff --git a/docker-compose.yml b/docker-compose.yml
index 5582fd4e24..242fe8c07f 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,5 +1,5 @@
-# Docker in this repo is only supported for running tests locally
services:
+ # test-shell is only provided for running tests locally
test-shell:
build:
context: .
@@ -13,22 +13,42 @@ services:
environment:
DJANGO_SETTINGS_MODULE: enterprise.settings.test
+ # Run a IdP in devstack (powered by keycloak).
keycloak:
container_name: "edx.devstack.keycloak"
hostname: keycloak.devstack.edx
- image: quay.io/keycloak/keycloak:22.0.1
+ image: quay.io/keycloak/keycloak:26.6
command: start-dev
networks:
- default:
+ devstack_default:
aliases:
- edx.devstack.keycloak
ports:
- "8080:8080"
environment:
- KEYCLOAK_ADMIN: admin
- KEYCLOAK_ADMIN_PASSWORD: admin
+ KC_BOOTSTRAP_ADMIN_USERNAME: admin
+ KC_BOOTSTRAP_ADMIN_PASSWORD: admin
+ KC_HTTP_HOST: "0.0.0.0"
volumes:
- keycloak_data:/opt/keycloak/data
+ keycloak-config-cli:
+ image: adorsys/keycloak-config-cli:latest-26
+ env_file: keycloak-devstack.env
+ volumes:
+ - ./keycloak-devstack.properties:/opt/keycloak-config-cli.properties:ro
+ - ./keycloak-devstack-realm.json:/config/keycloak-devstack-realm.json:ro
+ command: --spring.config.additional-location=/opt/keycloak-config-cli.properties
+ networks:
+ - devstack_default
+ depends_on:
+ - keycloak
+ profiles:
+ - provision
+
+networks:
+ devstack_default:
+ external: true
+
volumes:
keycloak_data:
diff --git a/docs/index.rst b/docs/index.rst
index 1625f5c360..164b09d0a0 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -17,6 +17,7 @@ Contents:
configuration_and_usage
development
testing
+ saml_testing
internationalization
segment_events
modules
diff --git a/docs/saml_testing.rst b/docs/saml_testing.rst
new file mode 100644
index 0000000000..d8c869a01d
--- /dev/null
+++ b/docs/saml_testing.rst
@@ -0,0 +1,188 @@
+.. _saml-testing-section:
+
+SAML Testing with Keycloak
+==========================
+
+edx-enterprise ships a local `Keycloak `_ setup that
+acts as a SAML Identity Provider (IdP) against a devstack LMS. This lets you
+test the full SAML login flow -- including the enterprise TPA pipeline steps --
+without an external IdP.
+
+Prerequisites
+-------------
+
+* A running `devstack `_ environment with
+ the LMS container up.
+* The edx-enterprise branch you want to test installed as an editable package
+ inside the LMS container (``/edx/src/edx-enterprise``).
+* Docker Compose (the ``docker compose`` CLI plugin).
+
+Starting Keycloak
+-----------------
+
+From the **edx-enterprise** repository root:
+
+.. code-block:: bash
+
+ $ make dev.up.keycloak
+
+This starts a Keycloak 26.x container (``edx.devstack.keycloak``) on the
+devstack Docker network, exposed at ``http://localhost:8080``. Keycloak data is
+persisted in a Docker volume (``keycloak_data``) so the container can be stopped
+and restarted without losing state.
+
+Provisioning
+------------
+
+Provisioning configures **both** Keycloak and the LMS in a single step:
+
+.. code-block:: bash
+
+ $ make dev.provision.keycloak
+
+Under the hood this runs two commands:
+
+1. ``keycloak-config-cli`` imports the realm definition
+ (``keycloak-devstack-realm.json``) into Keycloak, creating a ``devstack``
+ realm with a SAML client and a test user.
+2. ``provision-tpa.py`` runs inside the LMS container to create the matching
+ ``SAMLConfiguration``, ``SAMLProviderConfig``, ``EnterpriseCustomer`` link,
+ and a pre-linked LMS learner account.
+
+All shared configuration values (URLs, entity IDs, OIDs, test credentials) live
+in ``keycloak-devstack.env`` so the two sides stay in sync.
+
+Host setup
+----------
+
+The LMS redirects to Keycloak using the Docker hostname
+``edx.devstack.keycloak``. Your browser needs to resolve that name to
+localhost.
+
+Add this line to ``/etc/hosts`` on the machine where your browser runs (your
+laptop, **not** a remote codespace):
+
+.. code-block:: text
+
+ 127.0.0.1 edx.devstack.keycloak
+
+Testing the SAML login flow
+----------------------------
+
+1. Navigate to the SAML login URL:
+
+ ``http://localhost:18000/auth/login/tpa-saml/?auth_entry=login&idp=keycloak-devstack``
+
+2. You should be redirected to the Keycloak login page at
+ ``http://edx.devstack.keycloak:8080/realms/devstack/...``.
+
+3. Log in with the test credentials:
+
+ ========= =========================
+ Username ``keycloak_learner``
+ Password ``testpass``
+ ========= =========================
+
+4. Validate that you were **not** prompted to log into the existing LMS user.
+ The ``enterprise_associate_by_email`` pipeline step should discover that the
+ pre-provisioned LMS learner is already associated with the SAML-enabled
+ enterprise customer, so LMS authentication is skipped.
+
+5. Validate that you have been redirected to the LMS learner dashboard and are
+ logged in as ``keycloak_test_learner``.
+
+Testing the SAML disconnect flow
+--------------------------------
+
+Triggering the disconnect via the Account MFE
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+1. Bring up the Account MFE container (devstack runs it on port 1997):
+
+ .. code-block:: bash
+
+ $ make dev.up.frontend-app-account
+
+2. In the same browser session where you completed the SAML login, navigate to
+ the Linked Accounts section:
+
+ http://localhost:1997/#linked-accounts
+
+3. Find the Keycloak Devstack IdP entry (matches
+ SAMLProviderConfig.name) and click **Unlink Keycloak Devstack IdP
+ account**.
+
+4. The button should settle into the "unconnected" state with a "Sign in with
+ Keycloak Devstack IdP" link. indicating the MFE received a successful
+ disconnect response.
+
+Verifying the disconnect
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+1. **LMS logs** -- tail the LMS container and grep for the new debug lines:
+
+ .. code-block:: bash
+
+ $ docker logs --tail 500 edx.devstack.lms 2>&1 | grep -E 'SAMLAccountDisconnected|_unlink_enterprise_user_from_idp|successfully unlinked'
+
+ You should see all three lines, in order::
+
+ [THIRD_PARTY_AUTH] Emitting SAMLAccountDisconnected signal for user_id=, backend=tpa-saml
+ [ENTERPRISE] _unlink_enterprise_user_from_idp called for user_id=, backend=tpa-saml
+ Enterprise learner {keycloak_learner@example.com} successfully unlinked from Enterprise Customer {}
+
+Resetting state to repeat the test
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The simplest reset is to re-run provisioning:
+
+.. code-block:: bash
+
+ $ make dev.provision.keycloak
+
+Then navigate to the SAML login URL again to re-link:
+
+ http://localhost:18000/auth/login/tpa-saml/?auth_entry=login&idp=keycloak-devstack
+
+Note: re-running provisioning is necessary because when you clicked the
+**Unlink Keycloak Devstack IdP account** button, the SAML disconnect handler
+did more than just disconnect from the IdP, it also unlinked the
+EnterpriseCustomerUser. This is only recoverable by an admin or system
+operator, hence the need to use the provision script. Yes, that means in prod
+if a learner accidentally clicks the unlink-from-IdP button, they ALSO get
+unlinked from the enterprise itself and need to reach out to their admin to get
+re-linked to the enterprise.
+
+Stopping Keycloak
+-----------------
+
+.. code-block:: bash
+
+ $ make dev.stop.keycloak
+
+The ``keycloak_data`` volume is preserved, so the next ``make dev.up.keycloak``
+will resume with the same realm and user data.
+
+Troubleshooting
+---------------
+
+**saml --pull fails during provisioning**
+ The provisioning script runs ``saml --pull`` to fetch metadata from *all*
+ enabled SAML providers. If a pre-existing ``SAMLProviderConfig`` in your
+ devstack points to an unreachable metadata URL, the command will fail.
+ Audit the provider list in the LMS Django admin at
+ ``http://localhost:18000/admin/third_party_auth/samlproviderconfig/?show_history=1``.
+
+**Browser cannot reach edx.devstack.keycloak**
+ Verify the ``/etc/hosts`` entry described above. If you are using a GitHub
+ Codespace or other remote environment, the entry must be on the machine
+ running your browser, not inside the remote environment.
+
+**Keycloak admin console**
+ The Keycloak admin console is available at
+ ``http://localhost:8080/admin/master/console/`` with credentials
+ ``admin`` / ``admin``.
+
+**Account MFE shows no linked providers**
+ UserSocialAuth likely has no row for this user -- complete the SAML
+ login first.
diff --git a/enterprise/__init__.py b/enterprise/__init__.py
index e72aed73ef..9d947a2989 100644
--- a/enterprise/__init__.py
+++ b/enterprise/__init__.py
@@ -2,4 +2,4 @@
Your project description goes here.
"""
-__version__ = "8.0.4"
+__version__ = "8.0.5"
diff --git a/enterprise/apps.py b/enterprise/apps.py
index 79eae8c312..e976d80f2a 100644
--- a/enterprise/apps.py
+++ b/enterprise/apps.py
@@ -3,8 +3,9 @@
"""
from django.apps import AppConfig, apps
+from django.db.models.signals import post_save, pre_migrate
-from enterprise.constants import USER_POST_SAVE_DISPATCH_UID
+from enterprise.constants import SAML_ACCOUNT_DISCONNECTED_DISPATCH_UID, USER_POST_SAVE_DISPATCH_UID
class EnterpriseConfig(AppConfig):
@@ -39,16 +40,27 @@ def ready(self):
"""
Perform other one-time initialization steps.
"""
- from enterprise.signals import handle_user_post_save # pylint: disable=import-outside-toplevel
-
- from django.db.models.signals import post_save, pre_migrate # pylint: disable=C0415, # isort:skip
+ from enterprise.signals import ( # pylint: disable=import-outside-toplevel
+ handle_social_auth_disconnect,
+ handle_user_post_save,
+ )
post_save.connect(handle_user_post_save, sender=self.auth_user_model, dispatch_uid=USER_POST_SAVE_DISPATCH_UID)
pre_migrate.connect(self._disconnect_user_post_save_for_migrations)
+ try:
+ # pylint: disable=import-outside-toplevel
+ from common.djangoapps.third_party_auth.signals import SAMLAccountDisconnected
+ except ImportError:
+ pass
+ else:
+ SAMLAccountDisconnected.connect(
+ handle_social_auth_disconnect,
+ dispatch_uid=SAML_ACCOUNT_DISCONNECTED_DISPATCH_UID,
+ )
+
def _disconnect_user_post_save_for_migrations(self, sender, **kwargs): # pylint: disable=unused-argument
"""
Handle pre_migrate signal - disconnect User post_save handler.
"""
- from django.db.models.signals import post_save # pylint: disable=import-outside-toplevel
post_save.disconnect(sender=self.auth_user_model, dispatch_uid=USER_POST_SAVE_DISPATCH_UID)
diff --git a/enterprise/constants.py b/enterprise/constants.py
index aae9e66336..381a1e693a 100644
--- a/enterprise/constants.py
+++ b/enterprise/constants.py
@@ -6,7 +6,11 @@
# We listen to the User post_save signal in order to associate new users
# with an EnterpriseCustomer when applicable. This it the unique identifier
# used to ensure that signal receiver is only called once.
-USER_POST_SAVE_DISPATCH_UID = "user_post_save_upgrade_pending_enterprise_customer_user"
+USER_POST_SAVE_DISPATCH_UID = "enterprise.user_post_save_upgrade_pending_enterprise_customer_user"
+
+# We listen to the SAMLAccountDisconnected signal to unlink learners from their
+# enterprise identity provider.
+SAML_ACCOUNT_DISCONNECTED_DISPATCH_UID = "enterprise.handle_social_auth_disconnect"
# Data sharing consent messages
diff --git a/enterprise/settings/common.py b/enterprise/settings/common.py
index 8432d2d945..a29402e20c 100644
--- a/enterprise/settings/common.py
+++ b/enterprise/settings/common.py
@@ -3,7 +3,7 @@
"""
-def plugin_settings(settings): # pylint: disable=unused-argument
+def plugin_settings(settings):
"""
Override platform settings for the enterprise app.
@@ -13,3 +13,18 @@ def plugin_settings(settings): # pylint: disable=unused-argument
Args:
settings: The Django settings module being configured.
"""
+ pipeline = getattr(settings, 'SOCIAL_AUTH_PIPELINE', None)
+ if pipeline is not None:
+ email_step = 'enterprise.tpa_pipeline.enterprise_associate_by_email'
+ oauth_step = 'common.djangoapps.third_party_auth.pipeline.associate_by_email_if_oauth'
+ if email_step not in pipeline:
+ # pipeline.index() intentionally raises ValueError if the reference step is
+ # missing — this prevents Django from starting with a misconfigured pipeline.
+ pipeline.insert(pipeline.index(oauth_step), email_step)
+
+ logistration_step = 'enterprise.tpa_pipeline.handle_enterprise_logistration'
+ associate_step = 'social_core.pipeline.social_auth.associate_user'
+ if logistration_step not in pipeline:
+ # pipeline.index() intentionally raises ValueError if the reference step is
+ # missing — this prevents Django from starting with a misconfigured pipeline.
+ pipeline.insert(pipeline.index(associate_step) + 1, logistration_step)
diff --git a/enterprise/settings/test.py b/enterprise/settings/test.py
index a2c8a5dfb6..5e359aed4d 100644
--- a/enterprise/settings/test.py
+++ b/enterprise/settings/test.py
@@ -211,6 +211,13 @@ def root(*args):
TIME_ZONE = 'UTC'
+# Business logic should be allowed to assume that FEATURES is set. In a
+# running app, it's set by the platform, but in enterprise unit tests we'll
+# just seed one here.
+FEATURES = {
+ 'ENABLE_ENTERPRISE_INTEGRATION': True,
+}
+
MKTG_URLS = {}
ENTERPRISE_CUSTOMER_CATALOG_DEFAULT_CONTENT_FILTER = {
diff --git a/enterprise/signals.py b/enterprise/signals.py
index 472dc26edb..4577b4833c 100644
--- a/enterprise/signals.py
+++ b/enterprise/signals.py
@@ -3,11 +3,16 @@
"""
from logging import getLogger
+from typing import Any
+
+from social_core.backends.saml import SAMLAuth
from django.conf import settings
+from django.contrib.auth.models import AbstractUser
from django.db import transaction
from django.db.models.signals import post_delete, post_save, pre_save
from django.dispatch import receiver
+from django.http import HttpRequest
from enterprise import models, roles_api
from enterprise.api import activate_admin_permissions
@@ -40,6 +45,16 @@
COURSE_UNENROLLMENT_COMPLETED = None
USER_RETIRE_LMS_CRITICAL = None
+try:
+ from common.djangoapps.third_party_auth.provider import Registry
+except ImportError:
+ Registry = None
+
+try:
+ from openedx.features.enterprise_support.api import enterprise_customer_for_request
+except ImportError:
+ enterprise_customer_for_request = None
+
logger = getLogger(__name__)
_UNSAVED_FILEFIELD = 'unsaved_filefield'
INTEGRATED_CHANNELS = [
@@ -476,3 +491,67 @@ def retire_user_from_pending_enterprise_customer_user(sender, user, retired_emai
if USER_RETIRE_LMS_CRITICAL is not None:
USER_RETIRE_LMS_CRITICAL.connect(retire_user_from_pending_enterprise_customer_user)
+
+
+def _unlink_enterprise_user_from_idp(request: HttpRequest, user: AbstractUser, idp_backend_name: str) -> None:
+ """
+ Un-links learner from their enterprise identity provider.
+
+ Note: despite the name, this also flips ``EnterpriseCustomerUser.linked``
+ to ``False``, severing the learner's enterprise membership entirely -- not
+ just their IdP association. Re-linking is not learner-self-service and
+ requires admin intervention via the enterprise customer API or Django
+ admin. This is legacy behavior preserved during a refactor; whether it was
+ a conscious product decision is unclear.
+
+ Args:
+ request: The current HTTP request.
+ user (User): User who initiated disconnect request.
+ idp_backend_name (str): Name of identity provider's backend.
+ """
+ logger.info(
+ '[ENTERPRISE] _unlink_enterprise_user_from_idp called for user_id=%s, backend=%s',
+ user.id, idp_backend_name,
+ )
+ enterprise_customer = enterprise_customer_for_request(request)
+ if enterprise_customer:
+ enabled_providers = Registry.get_enabled_by_backend_name(idp_backend_name)
+ provider_ids = [enabled_provider.provider_id for enabled_provider in enabled_providers]
+ enterprise_customer_idps = models.EnterpriseCustomerIdentityProvider.objects.filter(
+ enterprise_customer__uuid=enterprise_customer['uuid'],
+ provider_id__in=provider_ids
+ )
+
+ if enterprise_customer_idps:
+ try:
+ # Unlink user email from each Enterprise Customer.
+ for enterprise_customer_idp in enterprise_customer_idps:
+ models.EnterpriseCustomerUser.objects.unlink_user(
+ enterprise_customer=enterprise_customer_idp.enterprise_customer,
+ user_email=user.email,
+ )
+ except (models.EnterpriseCustomerUser.DoesNotExist, models.PendingEnterpriseCustomerUser.DoesNotExist):
+ pass
+
+
+def handle_social_auth_disconnect(
+ sender: type, # pylint: disable=unused-argument
+ *, # Force everything that follows to be a keyword argument.
+ request: HttpRequest,
+ user: AbstractUser,
+ saml_backend: SAMLAuth,
+ **kwargs: Any,
+) -> None:
+ """
+ Handle SAMLAccountDisconnected signal to unlink enterprise user from IdP.
+
+ Arguments:
+ sender: the class that sent the signal (unused).
+ request: the HTTP request during which the disconnect occurred.
+ user: the Django User disconnecting the social auth account.
+ saml_backend: the SAML auth backend instance.
+ **kwargs: forward-compatible catch-all.
+ """
+ if not settings.FEATURES.get('ENABLE_ENTERPRISE_INTEGRATION', False):
+ return
+ _unlink_enterprise_user_from_idp(request, user, saml_backend.name)
diff --git a/enterprise/tpa_pipeline.py b/enterprise/tpa_pipeline.py
index e10c89e4aa..94fb74db81 100644
--- a/enterprise/tpa_pipeline.py
+++ b/enterprise/tpa_pipeline.py
@@ -2,24 +2,109 @@
Module provides elements to be used in third-party auth pipeline.
"""
+import logging
import re
from datetime import datetime
from logging import getLogger
from social_core.pipeline.partial import partial
+from social_core.pipeline.social_auth import associate_by_email
from social_django.models import UserSocialAuth
+from django.conf import settings
from django.urls import reverse
-from enterprise.models import EnterpriseCustomer, EnterpriseCustomerUser
-from enterprise.utils import get_identity_provider, get_social_auth_from_idp
+from enterprise.models import EnterpriseCustomer, EnterpriseCustomerIdentityProvider, EnterpriseCustomerUser
+from enterprise.utils import get_identity_provider, get_social_auth_from_idp, get_user_from_email
try:
from common.djangoapps.third_party_auth.provider import Registry
+ from common.djangoapps.third_party_auth.utils import is_saml_provider
except ImportError:
Registry = None
+ is_saml_provider = None
LOGGER = getLogger(__name__)
+log = logging.getLogger(__name__)
+
+
+def _is_enterprise_customer_user(provider_id, user):
+ """
+ Verify that the user is linked to the enterprise customer of the given identity provider.
+ """
+ enterprise_idp = EnterpriseCustomerIdentityProvider.objects.get(provider_id=provider_id)
+
+ return EnterpriseCustomerUser.objects.filter(
+ enterprise_customer=enterprise_idp.enterprise_customer,
+ user_id=user.id,
+ ).exists()
+
+
+def enterprise_associate_by_email(strategy, details, user=None, *args, **kwargs): # pylint: disable=keyword-arg-before-vararg
+ """
+ SAML pipeline step: associate the auth user with an existing user by email when the existing user is an enterprise
+ customer user for the current provider.
+
+ Note: It defers to the social library's associate_by_email implementation, which verifies that only a single
+ database user is associated with the email.
+
+ ENT-11577: This step replaces the now-defunct ``associate_by_email_if_saml`` step from openedx-platform.
+ """
+ if not getattr(settings, 'FEATURES', {}).get('ENABLE_ENTERPRISE_INTEGRATION', False):
+ return None
+
+ def associate_by_email_if_enterprise_user(current_user, current_provider):
+ """
+ If the learner arriving via SAML is already linked to the enterprise customer linked to the same IdP,
+ they should not be prompted for their edX password.
+ """
+ try:
+ is_enterprise_customer_user = _is_enterprise_customer_user(current_provider.provider_id, current_user)
+
+ log.info(
+ f'[Multiple_SSO_SAML_Accounts_Association_to_User] Enterprise user verification: '
+ f'User Email: {current_user.email}, User ID: {current_user.id}, '
+ f'Provider ID: {current_provider.provider_id}, '
+ f'is_enterprise_customer_user: {is_enterprise_customer_user}'
+ )
+
+ if is_enterprise_customer_user:
+ association_response = associate_by_email(
+ details=details, user=user, *args, **kwargs
+ )
+
+ if association_response and association_response.get('user'):
+ if not association_response['user'].is_active:
+ log.info(
+ f'[Multiple_SSO_SAML_Accounts_Association_to_User] User association account is not active: '
+ f'User Email: {current_user.email}, User ID: {current_user.id}, '
+ f'Provider ID: {current_provider.provider_id}, '
+ f'is_enterprise_customer_user: {is_enterprise_customer_user}'
+ )
+ return None
+
+ return association_response
+
+ except Exception: # pylint: disable=broad-except
+ log.exception(
+ f'[Multiple_SSO_SAML_Accounts_Association_to_User] Error in saml multiple accounts association: '
+ f'User Email: {current_user.email}, User ID: {current_user.id}, '
+ f'Provider ID: {current_provider.provider_id}'
+ )
+ return None
+
+ saml_provider, current_provider = is_saml_provider(strategy.request.backend.name, kwargs)
+
+ if saml_provider:
+ # Get the user by matching email if the pipeline user is not available.
+ current_user = user if user else get_user_from_email(details.get('email'))
+
+ # Verify that the user is linked to enterprise customer of current identity provider and is active.
+ associate_response = (
+ associate_by_email_if_enterprise_user(current_user, current_provider) if current_user else None
+ )
+ if associate_response:
+ return associate_response
def get_sso_provider(request, pipeline):
diff --git a/enterprise/utils.py b/enterprise/utils.py
index de27fa1cf8..d929577a5b 100644
--- a/enterprise/utils.py
+++ b/enterprise/utils.py
@@ -22,6 +22,7 @@
from django.apps import apps
from django.conf import settings
from django.contrib import auth
+from django.contrib.auth.models import AbstractUser
from django.core import mail
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.core.validators import validate_email
@@ -1719,7 +1720,7 @@ def get_idiff_list(list_a, list_b):
return list(set(lower_list_a) - set(lower_list_b))
-def get_users_by_email(emails):
+def get_users_by_email(emails: list[str]) -> tuple[QuerySet, list[str]]:
"""
Accept a list of emails, and separate them into users that exist on OpenEdX and users who don't.
@@ -1736,6 +1737,15 @@ def get_users_by_email(emails):
return users, unregistered_emails
+def get_user_from_email(email: str) -> AbstractUser | None:
+ """
+ Return a single user matching the email, or None.
+ """
+ if email:
+ return User.objects.filter(email=email).first()
+ return None
+
+
def is_user_enrolled(user, course_id, course_mode, enrollment_client=None):
"""
Query the enrollment API and determine if a learner is enrolled in a given course run track.
diff --git a/keycloak-devstack-realm.json b/keycloak-devstack-realm.json
new file mode 100644
index 0000000000..0da3a3d669
--- /dev/null
+++ b/keycloak-devstack-realm.json
@@ -0,0 +1,101 @@
+{
+ "realm": "$(env:REALM_NAME)",
+ "enabled": true,
+ "clientScopes": [
+ {
+ "name": "saml-user-attributes",
+ "protocol": "saml",
+ "protocolMappers": [
+ {
+ "name": "email",
+ "protocol": "saml",
+ "protocolMapper": "saml-user-property-mapper",
+ "consentRequired": false,
+ "config": {
+ "user.attribute": "email",
+ "friendly.name": "email",
+ "attribute.name": "$(env:OID_EMAIL)",
+ "attribute.nameformat": "URI Reference"
+ }
+ },
+ {
+ "name": "firstName",
+ "protocol": "saml",
+ "protocolMapper": "saml-user-property-mapper",
+ "consentRequired": false,
+ "config": {
+ "user.attribute": "firstName",
+ "friendly.name": "givenName",
+ "attribute.name": "$(env:OID_GIVEN_NAME)",
+ "attribute.nameformat": "URI Reference"
+ }
+ },
+ {
+ "name": "lastName",
+ "protocol": "saml",
+ "protocolMapper": "saml-user-property-mapper",
+ "consentRequired": false,
+ "config": {
+ "user.attribute": "lastName",
+ "friendly.name": "sn",
+ "attribute.name": "$(env:OID_SURNAME)",
+ "attribute.nameformat": "URI Reference"
+ }
+ }
+ ]
+ },
+ {
+ "name": "role_list",
+ "protocol": "saml",
+ "protocolMappers": [
+ {
+ "name": "role list",
+ "protocol": "saml",
+ "protocolMapper": "saml-role-list-mapper",
+ "consentRequired": false,
+ "config": {
+ "single": "true",
+ "attribute.nameformat": "Basic",
+ "attribute.name": "Role"
+ }
+ }
+ ]
+ }
+ ],
+ "clients": [
+ {
+ "clientId": "$(env:SP_ENTITY_ID)",
+ "protocol": "saml",
+ "enabled": true,
+ "rootUrl": "$(env:SP_ENTITY_ID)",
+ "redirectUris": ["$(env:ACS_URL)*"],
+ "adminUrl": "$(env:ACS_URL)",
+ "attributes": {
+ "saml.assertion.signature": "true",
+ "saml.force.post.binding": "true",
+ "saml_assertion_consumer_url_post": "$(env:ACS_URL)",
+ "saml_name_id_format": "email",
+ "saml.force.name.id.format": "true",
+ "saml.client.signature": "false"
+ },
+ "defaultClientScopes": ["saml-user-attributes", "role_list"]
+ }
+ ],
+ "users": [
+ {
+ "username": "$(env:TEST_USERNAME)",
+ "email": "$(env:TEST_EMAIL)",
+ "emailVerified": true,
+ "enabled": true,
+ "firstName": "$(env:TEST_FIRST_NAME)",
+ "lastName": "$(env:TEST_LAST_NAME)",
+ "credentials": [
+ {
+ "type": "password",
+ "value": "$(env:TEST_PASSWORD)",
+ "temporary": false
+ }
+ ]
+ }
+ ]
+}
diff --git a/keycloak-devstack.env b/keycloak-devstack.env
new file mode 100644
index 0000000000..fee97a33fe
--- /dev/null
+++ b/keycloak-devstack.env
@@ -0,0 +1,47 @@
+# Environment variables for keycloak-config-cli variable substitution.
+# These are the single source of truth for values shared between the Keycloak
+# realm config (keycloak-devstack-realm.json) and LMS setup (runbooks/07.md).
+
+# Keycloak realm name. All SAML IdP objects live inside this realm.
+REALM_NAME=devstack
+
+# Keycloak base URL (Docker hostname, reachable from the LMS container and host).
+KEYCLOAK_URL=http://edx.devstack.keycloak:8080
+
+# SAMLProviderConfig slug used in the LMS. The provider_id registered with
+# python-social-auth will be "saml-{SAML_SLUG}" (e.g. "saml-test-saml-idp").
+SAML_SLUG=keycloak-devstack
+
+# LMS Service Provider entity ID. Must match SAMLConfiguration.entity_id in
+# the LMS (runbook step 3b) AND the Client ID of the SAML client in Keycloak.
+SP_ENTITY_ID=http://localhost:18000
+
+# SAML Assertion Consumer Service URL — the LMS endpoint that receives the SAML
+# response POST from Keycloak after the user authenticates.
+ACS_URL=http://localhost:18000/auth/complete/tpa-saml/
+
+# Standard OIDs for SAML assertion attributes. The SAMLProviderConfig in the
+# LMS (runbook step 3c) must reference the same OIDs so the pipeline can
+# extract user details from the assertion.
+# OID_EMAIL: RFC 2798 mail
+# OID_GIVEN_NAME: X.520 givenName
+# OID_SURNAME: X.520 sn (surname)
+OID_EMAIL=urn:oid:0.9.2342.19200300.100.1.3
+OID_GIVEN_NAME=urn:oid:2.5.4.42
+OID_SURNAME=urn:oid:2.5.4.4
+
+# Keycloak test user credentials. A user with these attributes is created in
+# the Keycloak realm for SAML login testing. TEST_USERNAME intentionally differs
+# from LMS_USERNAME to verify that SAML association works by email matching, not
+# username matching.
+TEST_USERNAME=keycloak_learner
+TEST_EMAIL=keycloak_learner@example.com
+TEST_PASSWORD=testpass
+TEST_FIRST_NAME=Keycloak
+TEST_LAST_NAME=Learner
+
+# LMS test user credentials. This LMS user is pre-linked to the enterprise
+# customer so that enterprise_associate_by_email can match them to the Keycloak
+# user above during SAML login. The email must match TEST_EMAIL.
+LMS_USERNAME=keycloak_test_learner
+LMS_PASSWORD=edx
diff --git a/keycloak-devstack.properties b/keycloak-devstack.properties
new file mode 100644
index 0000000000..e67a20c7cc
--- /dev/null
+++ b/keycloak-devstack.properties
@@ -0,0 +1,17 @@
+# keycloak-config-cli configuration for devstack provisioning.
+# See https://adorsys.github.io/keycloak-config-cli/installation/
+
+# Target Keycloak instance (Docker hostname on the devstack network).
+keycloak.url=http://edx.devstack.keycloak:8080
+keycloak.user=admin
+keycloak.password=admin
+
+# Realm JSON file to import (mounted into the container at /config/).
+import.files.locations=/config/keycloak-devstack-realm.json
+
+# Enable $(VAR) substitution in the realm JSON so that environment variables
+# from keycloak-devstack.env are resolved at import time.
+import.var-substitution.enabled=true
+
+# Wait for Keycloak to be healthy before attempting the import.
+keycloak.availability-check.enabled=true
diff --git a/provision-tpa.py b/provision-tpa.py
new file mode 100644
index 0000000000..2d3bb5bf5e
--- /dev/null
+++ b/provision-tpa.py
@@ -0,0 +1,196 @@
+"""
+Provision SAML / TPA records in the LMS for Keycloak IdP testing.
+
+Run inside the LMS container via:
+ manage.py lms shell < provision-tpa.py
+
+All configuration is read from environment variables (see keycloak-devstack.env).
+"""
+import os
+import sys
+
+from django.conf import settings
+from django.contrib.auth import get_user_model
+from django.contrib.sites.models import Site
+from django.core.management import call_command
+
+from common.djangoapps.student.models import UserProfile
+from common.djangoapps.third_party_auth.models import (
+ SAMLConfiguration,
+ SAMLProviderConfig,
+ SAMLProviderData,
+)
+from enterprise.models import (
+ EnterpriseCustomer,
+ EnterpriseCustomerIdentityProvider,
+ EnterpriseCustomerUser,
+)
+
+User = get_user_model()
+
+# ---------------------------------------------------------------------------
+# Read configuration from environment (sourced from keycloak-devstack.env)
+# ---------------------------------------------------------------------------
+KEYCLOAK_URL = os.environ['KEYCLOAK_URL']
+REALM_NAME = os.environ['REALM_NAME']
+SP_ENTITY_ID = os.environ['SP_ENTITY_ID']
+SAML_SLUG = os.environ['SAML_SLUG']
+OID_EMAIL = os.environ['OID_EMAIL']
+OID_GIVEN_NAME = os.environ['OID_GIVEN_NAME']
+OID_SURNAME = os.environ['OID_SURNAME']
+TEST_USERNAME = os.environ['TEST_USERNAME']
+TEST_EMAIL = os.environ['TEST_EMAIL']
+TEST_PASSWORD = os.environ['TEST_PASSWORD']
+TEST_FIRST_NAME = os.environ['TEST_FIRST_NAME']
+TEST_LAST_NAME = os.environ['TEST_LAST_NAME']
+LMS_USERNAME = os.environ['LMS_USERNAME']
+LMS_PASSWORD = os.environ['LMS_PASSWORD']
+
+# Derived constants
+IDP_ENTITY_ID = f'{KEYCLOAK_URL}/realms/{REALM_NAME}'
+IDP_METADATA_URL = f'{IDP_ENTITY_ID}/protocol/saml/descriptor'
+PROVIDER_ID = f'saml-{SAML_SLUG}'
+
+# ---------------------------------------------------------------------------
+# Step 1: Seed enterprise devstack data
+# ---------------------------------------------------------------------------
+print('\n--- Step 1: Seed enterprise devstack data ---')
+call_command('seed_enterprise_devstack_data')
+
+# ---------------------------------------------------------------------------
+# Step 2: Create SAMLConfiguration (global SP config)
+# ---------------------------------------------------------------------------
+print('\n--- Step 2: Create SAMLConfiguration ---')
+site = Site.objects.get_current()
+# SAMLConfiguration is a ConfigurationModel with KEY_FIELDS = ('site_id', 'slug').
+# Multiple rows per (site, slug) is expected — each row is a version.
+# Just create a new version with the desired settings.
+saml_config = SAMLConfiguration(
+ site=site,
+ slug='default',
+ enabled=True,
+ entity_id=SP_ENTITY_ID,
+)
+saml_config.save()
+print(f'Created SAMLConfiguration version (slug=default, entity_id={SP_ENTITY_ID})')
+
+# ---------------------------------------------------------------------------
+# Step 3: Create SAMLProviderConfig (Keycloak IdP)
+# ---------------------------------------------------------------------------
+print('\n--- Step 3: Create SAMLProviderConfig ---')
+# SAMLProviderConfig is a ConfigurationModel with KEY_FIELDS = ('slug',).
+# Multiple rows per slug is expected — each row is a version.
+provider_config = SAMLProviderConfig(
+ site=site,
+ slug=SAML_SLUG,
+ name='Keycloak Devstack IdP',
+ entity_id=IDP_ENTITY_ID,
+ metadata_source=IDP_METADATA_URL,
+ enabled=True,
+ visible=True,
+ skip_registration_form=True,
+ skip_email_verification=True,
+ send_to_registration_first=True,
+ attr_user_permanent_id=OID_EMAIL,
+ attr_email=OID_EMAIL,
+ attr_first_name=OID_GIVEN_NAME,
+ attr_last_name=OID_SURNAME,
+)
+provider_config.save()
+print(f'Created SAMLProviderConfig version (slug={SAML_SLUG}, entity_id={IDP_ENTITY_ID})')
+
+# ---------------------------------------------------------------------------
+# Step 4: Create EnterpriseCustomerIdentityProvider
+# ---------------------------------------------------------------------------
+print('\n--- Step 4: Create EnterpriseCustomerIdentityProvider ---')
+ec = EnterpriseCustomer.objects.get(slug='test-enterprise')
+ecidp, created = EnterpriseCustomerIdentityProvider.objects.get_or_create(
+ provider_id=PROVIDER_ID,
+ enterprise_customer=ec,
+)
+action = 'Created' if created else 'Already exists'
+print(f'{action}: provider_id={PROVIDER_ID}, enterprise={ec.name}')
+
+# ---------------------------------------------------------------------------
+# Step 5: Fetch SAML metadata from Keycloak
+# ---------------------------------------------------------------------------
+print('\n--- Step 5: Fetch SAML metadata (saml --pull) ---')
+# saml --pull fetches metadata for ALL enabled providers. If any pre-existing
+# provider has an unreachable metadata URL, the command will fail.
+try:
+ call_command('saml', pull=True)
+except Exception as exc:
+ print(f'\nERROR: saml --pull failed: {exc}')
+ print('This usually means a pre-existing SAMLProviderConfig has an unreachable metadata URL.')
+ print(f'Audit providers at: {SP_ENTITY_ID}/admin/third_party_auth/samlproviderconfig/?show_history=1')
+ sys.exit(1)
+
+# ---------------------------------------------------------------------------
+# Step 6: Verify SAMLProviderData
+# ---------------------------------------------------------------------------
+print('\n--- Step 6: Verify SAMLProviderData ---')
+provider_data = SAMLProviderData.objects.filter(entity_id=IDP_ENTITY_ID)
+if provider_data.exists():
+ d = provider_data.latest('fetched_at')
+ print(f'SAMLProviderData fetched at {d.fetched_at}')
+ print(f'SSO URL: {d.sso_url}')
+ print(f'Public key present: {bool(d.public_key)}')
+else:
+ print('ERROR: No SAMLProviderData found. Check saml --pull output above.')
+ sys.exit(1)
+
+# ---------------------------------------------------------------------------
+# Step 7: Verify pipeline injection
+# ---------------------------------------------------------------------------
+print('\n--- Step 7: Verify pipeline injection ---')
+pipeline = settings.SOCIAL_AUTH_PIPELINE
+email_step = 'enterprise.tpa_pipeline.enterprise_associate_by_email'
+logistration_step = 'enterprise.tpa_pipeline.handle_enterprise_logistration'
+missing = []
+if email_step not in pipeline:
+ missing.append(email_step)
+if logistration_step not in pipeline:
+ missing.append(logistration_step)
+if missing:
+ print(f'ERROR: Missing pipeline steps: {missing}')
+ sys.exit(1)
+print('Pipeline injection verified (enterprise_associate_by_email, handle_enterprise_logistration)')
+
+# ---------------------------------------------------------------------------
+# Step 8: Create pre-linked enterprise learner
+# ---------------------------------------------------------------------------
+print('\n--- Step 8: Create pre-linked enterprise learner ---')
+learner, created = User.objects.get_or_create(
+ username=LMS_USERNAME,
+ defaults={
+ 'email': TEST_EMAIL,
+ 'is_active': True,
+ },
+)
+if created:
+ learner.set_password(LMS_PASSWORD)
+ learner.save()
+ print(f'Created LMS user: {learner.username} (email={learner.email})')
+else:
+ print(f'LMS user already exists: {learner.username}')
+
+UserProfile.objects.get_or_create(
+ user=learner,
+ defaults={'name': f'{TEST_FIRST_NAME} {TEST_LAST_NAME}'},
+)
+print('UserProfile ensured')
+
+ecu, created = EnterpriseCustomerUser.objects.get_or_create(
+ enterprise_customer=ec,
+ user_id=learner.id,
+ defaults={'active': True},
+)
+action = 'Created' if created else 'Already exists'
+print(f'EnterpriseCustomerUser {action}: active={ecu.active}')
+
+# ---------------------------------------------------------------------------
+# Done
+# ---------------------------------------------------------------------------
+print('\n=== LMS TPA provisioning complete ===')
+print(f'SAML login URL: http://localhost:18000/auth/login/tpa-saml/?auth_entry=login&idp={SAML_SLUG}')
+print(f'Keycloak login: {TEST_USERNAME} / {TEST_PASSWORD}')
diff --git a/requirements/base.in b/requirements/base.in
index 2eace22f4a..f874c71f51 100644
--- a/requirements/base.in
+++ b/requirements/base.in
@@ -38,6 +38,7 @@ paramiko
path.py
pillow
python-dateutil
+python3-saml
pytz
requests
rules
diff --git a/requirements/dev.txt b/requirements/dev.txt
index 51bb5e908e..6fcbc8eae2 100644
--- a/requirements/dev.txt
+++ b/requirements/dev.txt
@@ -579,6 +579,12 @@ invoke==3.0.3
# -r requirements/test-master.txt
# -r requirements/test.txt
# paramiko
+isodate==0.7.2
+ # via
+ # -r requirements/doc.txt
+ # -r requirements/test-master.txt
+ # -r requirements/test.txt
+ # python3-saml
isort==6.1.0
# via
# -r requirements/dev.in
@@ -620,6 +626,8 @@ lxml[html-clean]==5.3.2
# -r requirements/test.txt
# edx-i18n-tools
# lxml-html-clean
+ # python3-saml
+ # xmlsec
lxml-html-clean==0.4.4
# via lxml
markupsafe==3.0.3
@@ -880,6 +888,11 @@ python3-openid==3.2.0 ; python_version >= "3"
# -r requirements/test-master.txt
# -r requirements/test.txt
# social-auth-core
+python3-saml==1.16.0
+ # via
+ # -r requirements/doc.txt
+ # -r requirements/test-master.txt
+ # -r requirements/test.txt
pytz==2026.1.post1
# via
# -r requirements/doc.txt
@@ -1142,6 +1155,12 @@ wheel==0.47.0
# via
# -r requirements/dev.in
# pip-tools
+xmlsec==1.3.14
+ # via
+ # -r requirements/doc.txt
+ # -r requirements/test-master.txt
+ # -r requirements/test.txt
+ # python3-saml
# The following packages are considered to be unsafe in a requirements file:
# pip
diff --git a/requirements/doc.txt b/requirements/doc.txt
index 5bcc3f9b71..6c634e6110 100644
--- a/requirements/doc.txt
+++ b/requirements/doc.txt
@@ -355,6 +355,10 @@ invoke==3.0.3
# via
# -r requirements/test-master.txt
# paramiko
+isodate==0.7.2
+ # via
+ # -r requirements/test-master.txt
+ # python3-saml
jinja2==3.1.6
# via
# -r requirements/test-master.txt
@@ -373,7 +377,10 @@ kombu==5.6.2
# -r requirements/test-master.txt
# celery
lxml==5.3.2
- # via -r requirements/test-master.txt
+ # via
+ # -r requirements/test-master.txt
+ # python3-saml
+ # xmlsec
markupsafe==3.0.3
# via
# -r requirements/test-master.txt
@@ -497,6 +504,8 @@ python3-openid==3.2.0 ; python_version >= "3"
# via
# -r requirements/test-master.txt
# social-auth-core
+python3-saml==1.16.0
+ # via -r requirements/test-master.txt
pytz==2026.1.post1
# via
# -r requirements/test-master.txt
@@ -650,6 +659,10 @@ webencodings==0.5.1
# via
# -r requirements/test-master.txt
# bleach
+xmlsec==1.3.14
+ # via
+ # -r requirements/test-master.txt
+ # python3-saml
# The following packages are considered to be unsafe in a requirements file:
# setuptools
diff --git a/requirements/test-master.txt b/requirements/test-master.txt
index b1a621ec27..dd14081e8f 100644
--- a/requirements/test-master.txt
+++ b/requirements/test-master.txt
@@ -367,6 +367,10 @@ invoke==3.0.3
# via
# -c requirements/edx-platform-constraints.txt
# paramiko
+isodate==0.7.2
+ # via
+ # -c requirements/edx-platform-constraints.txt
+ # python3-saml
jinja2==3.1.6
# via
# -c requirements/edx-platform-constraints.txt
@@ -391,6 +395,8 @@ lxml==5.3.2
# via
# -c requirements/edx-platform-constraints.txt
# -r requirements/base.in
+ # python3-saml
+ # xmlsec
markupsafe==3.0.3
# via
# -c requirements/edx-platform-constraints.txt
@@ -511,6 +517,10 @@ python3-openid==3.2.0 ; python_version >= "3"
# via
# -c requirements/edx-platform-constraints.txt
# social-auth-core
+python3-saml==1.16.0
+ # via
+ # -c requirements/edx-platform-constraints.txt
+ # -r requirements/base.in
pytz==2026.1.post1
# via
# -c requirements/edx-platform-constraints.txt
@@ -643,6 +653,10 @@ webencodings==0.5.1
# via
# -c requirements/edx-platform-constraints.txt
# bleach
+xmlsec==1.3.14
+ # via
+ # -c requirements/edx-platform-constraints.txt
+ # python3-saml
# The following packages are considered to be unsafe in a requirements file:
# setuptools
diff --git a/requirements/test.txt b/requirements/test.txt
index b8a5a3552c..4a8fbbb211 100644
--- a/requirements/test.txt
+++ b/requirements/test.txt
@@ -346,6 +346,10 @@ invoke==3.0.3
# via
# -r requirements/test-master.txt
# paramiko
+isodate==0.7.2
+ # via
+ # -r requirements/test-master.txt
+ # python3-saml
jinja2==3.1.6
# via
# -r requirements/test-master.txt
@@ -364,7 +368,10 @@ kombu==5.6.2
# -r requirements/test-master.txt
# celery
lxml==5.3.2
- # via -r requirements/test-master.txt
+ # via
+ # -r requirements/test-master.txt
+ # python3-saml
+ # xmlsec
markupsafe==3.0.3
# via
# -r requirements/test-master.txt
@@ -493,6 +500,8 @@ python3-openid==3.2.0 ; python_version >= "3"
# via
# -r requirements/test-master.txt
# social-auth-core
+python3-saml==1.16.0
+ # via -r requirements/test-master.txt
pytz==2026.1.post1
# via
# -r requirements/test-master.txt
@@ -617,6 +626,10 @@ webencodings==0.5.1
# via
# -r requirements/test-master.txt
# bleach
+xmlsec==1.3.14
+ # via
+ # -r requirements/test-master.txt
+ # python3-saml
# The following packages are considered to be unsafe in a requirements file:
# setuptools
diff --git a/tests/test_enterprise/test_signals.py b/tests/test_enterprise/test_signals.py
index f9a526b603..2dc85c6c2e 100644
--- a/tests/test_enterprise/test_signals.py
+++ b/tests/test_enterprise/test_signals.py
@@ -27,9 +27,11 @@
SystemWideEnterpriseUserRoleAssignment,
)
from enterprise.signals import (
+ _unlink_enterprise_user_from_idp,
course_enrollment_changed_receiver,
create_enterprise_enrollment_receiver,
enterprise_unenrollment_receiver,
+ handle_social_auth_disconnect,
handle_user_post_save,
retire_user_from_pending_enterprise_customer_user,
)
@@ -41,6 +43,7 @@
EnterpriseCourseEnrollmentFactory,
EnterpriseCustomerCatalogFactory,
EnterpriseCustomerFactory,
+ EnterpriseCustomerIdentityProviderFactory,
EnterpriseCustomerUserFactory,
EnterpriseGroupMembershipFactory,
LearnerCreditEnterpriseCourseEnrollmentFactory,
@@ -1218,3 +1221,147 @@ def test_idempotent_when_already_retired(self):
self.pending_enterprise_user.refresh_from_db()
assert self.pending_enterprise_user.user_email == self.retired_email
+
+
+@mark.django_db
+class TestUnlinkEnterpriseUserFromIdp(unittest.TestCase):
+ """
+ Tests for _unlink_enterprise_user_from_idp helper.
+ """
+
+ def setUp(self):
+ super().setUp()
+ self.user = UserFactory()
+
+ @mock.patch('enterprise.signals.Registry')
+ @mock.patch('enterprise.signals.enterprise_customer_for_request')
+ def test_unlink_enterprise_user_from_idp(self, mock_customer_for_request, mock_registry):
+ """
+ Verify that the user is unlinked from the enterprise customer when
+ the IdP backend matches the enterprise identity provider.
+ """
+ customer_idp = EnterpriseCustomerIdentityProviderFactory.create(
+ provider_id='the-provider',
+ )
+ customer = customer_idp.enterprise_customer
+ EnterpriseCustomerUserFactory.create(
+ enterprise_customer=customer,
+ user_id=self.user.id,
+ )
+ mock_customer_for_request.return_value = {'uuid': customer.uuid}
+ mock_registry.get_enabled_by_backend_name.return_value = [
+ mock.Mock(provider_id='the-provider'),
+ ]
+ request = mock.Mock()
+
+ _unlink_enterprise_user_from_idp(request, self.user, idp_backend_name='the-backend-name')
+
+ assert EnterpriseCustomerUser.objects.filter(user_id=self.user.id).count() == 0
+
+ @mock.patch('enterprise.signals.Registry')
+ @mock.patch('enterprise.signals.enterprise_customer_for_request')
+ def test_unlink_enterprise_user_from_idp_no_customer_user(self, mock_customer_for_request, mock_registry):
+ """
+ Verify no error when user has no EnterpriseCustomerUser record.
+ """
+ customer_idp = EnterpriseCustomerIdentityProviderFactory.create(
+ provider_id='the-provider',
+ )
+ customer = customer_idp.enterprise_customer
+ mock_customer_for_request.return_value = {'uuid': customer.uuid}
+ mock_registry.get_enabled_by_backend_name.return_value = [
+ mock.Mock(provider_id='the-provider'),
+ ]
+ request = mock.Mock()
+
+ _unlink_enterprise_user_from_idp(request, self.user, idp_backend_name='the-backend-name')
+
+ assert EnterpriseCustomerUser.objects.filter(user_id=self.user.id).count() == 0
+
+ @mock.patch('enterprise.signals.Registry')
+ @mock.patch('enterprise.signals.enterprise_customer_for_request')
+ def test_no_op_when_enterprise_customer_is_none(self, mock_customer_for_request, mock_registry):
+ """
+ Verify that nothing happens when enterprise_customer_for_request returns None.
+ """
+ mock_customer_for_request.return_value = None
+ request = mock.Mock()
+
+ _unlink_enterprise_user_from_idp(request, self.user, idp_backend_name='the-backend-name')
+
+ mock_registry.get_enabled_by_backend_name.assert_not_called()
+
+ @mock.patch('enterprise.signals.Registry')
+ @mock.patch('enterprise.signals.enterprise_customer_for_request')
+ def test_no_op_when_no_matching_idps(self, mock_customer_for_request, mock_registry):
+ """
+ Verify that unlink_user is not called when registry provider IDs don't
+ match any EnterpriseCustomerIdentityProvider records.
+ """
+ customer_idp = EnterpriseCustomerIdentityProviderFactory.create(
+ provider_id='provider-A',
+ )
+ customer = customer_idp.enterprise_customer
+ EnterpriseCustomerUserFactory.create(
+ enterprise_customer=customer,
+ user_id=self.user.id,
+ )
+ mock_customer_for_request.return_value = {'uuid': customer.uuid}
+ # Registry returns a provider with a different ID than the one in the DB.
+ mock_registry.get_enabled_by_backend_name.return_value = [
+ mock.Mock(provider_id='provider-B'),
+ ]
+ request = mock.Mock()
+
+ _unlink_enterprise_user_from_idp(request, self.user, idp_backend_name='the-backend-name')
+
+ # The user should still be linked since no IdPs matched.
+ assert EnterpriseCustomerUser.objects.filter(user_id=self.user.id).count() == 1
+
+
+@mark.django_db
+class TestHandleSocialAuthDisconnect(unittest.TestCase):
+ """
+ Tests for handle_social_auth_disconnect signal handler.
+ """
+
+ @override_settings(FEATURES={'ENABLE_ENTERPRISE_INTEGRATION': True})
+ @mock.patch(
+ 'enterprise.signals._unlink_enterprise_user_from_idp',
+ )
+ def test_calls_unlink(self, mock_unlink):
+ """
+ Test that _unlink_enterprise_user_from_idp is called.
+ """
+ request = mock.MagicMock()
+ user = mock.MagicMock()
+ saml_backend = mock.MagicMock()
+
+ handle_social_auth_disconnect(
+ sender=None,
+ request=request,
+ user=user,
+ saml_backend=saml_backend,
+ )
+ mock_unlink.assert_called_once_with(request, user, saml_backend.name)
+
+ @override_settings(FEATURES={'ENABLE_ENTERPRISE_INTEGRATION': False})
+ @mock.patch(
+ 'enterprise.signals._unlink_enterprise_user_from_idp',
+ )
+ def test_skips_unlink_when_enterprise_integration_disabled(self, mock_unlink):
+ """
+ Test that _unlink_enterprise_user_from_idp is not called when
+ ENABLE_ENTERPRISE_INTEGRATION is False.
+ """
+ request = mock.MagicMock()
+ user = mock.MagicMock()
+ saml_backend = mock.MagicMock()
+
+ handle_social_auth_disconnect(
+ sender=None,
+ request=request,
+ user=user,
+ saml_backend=saml_backend,
+ )
+ mock_unlink.assert_not_called()
diff --git a/tests/test_settings.py b/tests/test_settings.py
new file mode 100644
index 0000000000..9f81c18134
--- /dev/null
+++ b/tests/test_settings.py
@@ -0,0 +1,126 @@
+"""
+Tests for enterprise plugin settings (enterprise/settings/common.py).
+"""
+import unittest
+from types import SimpleNamespace
+
+import pytest
+
+from enterprise.settings.common import plugin_settings
+
+
+class TestPluginSettingsPipelineInjection(unittest.TestCase):
+ """
+ Tests for SOCIAL_AUTH_PIPELINE step injection in plugin_settings().
+ """
+
+ def _make_settings(self, pipeline=None):
+ """Return a simple settings namespace with the given pipeline."""
+ return SimpleNamespace(SOCIAL_AUTH_PIPELINE=pipeline)
+
+ def _base_pipeline(self):
+ """Return a minimal pipeline list resembling the platform default."""
+ return [
+ 'common.djangoapps.third_party_auth.pipeline.parse_query_params',
+ 'social_core.pipeline.social_auth.social_details',
+ 'social_core.pipeline.social_auth.social_uid',
+ 'social_core.pipeline.social_auth.auth_allowed',
+ 'social_core.pipeline.social_auth.social_user',
+ 'common.djangoapps.third_party_auth.pipeline.associate_by_email_if_login_api',
+ 'common.djangoapps.third_party_auth.pipeline.associate_by_email_if_oauth',
+ 'common.djangoapps.third_party_auth.pipeline.get_username',
+ 'common.djangoapps.third_party_auth.pipeline.set_pipeline_timeout',
+ 'common.djangoapps.third_party_auth.pipeline.ensure_user_information',
+ 'social_core.pipeline.user.create_user',
+ 'social_core.pipeline.social_auth.associate_user',
+ 'social_core.pipeline.social_auth.load_extra_data',
+ 'social_core.pipeline.user.user_details',
+ 'common.djangoapps.third_party_auth.pipeline.user_details_force_sync',
+ 'common.djangoapps.third_party_auth.pipeline.set_id_verification_status',
+ 'common.djangoapps.third_party_auth.pipeline.set_logged_in_cookies',
+ 'common.djangoapps.third_party_auth.pipeline.login_analytics',
+ 'common.djangoapps.third_party_auth.pipeline.ensure_redirect_url_is_safe',
+ ]
+
+ def test_inserts_email_step_before_oauth(self):
+ """
+ enterprise_associate_by_email should be inserted immediately before
+ associate_by_email_if_oauth.
+ """
+ pipeline = self._base_pipeline()
+ settings = self._make_settings(pipeline=pipeline)
+ plugin_settings(settings)
+
+ email_step = 'enterprise.tpa_pipeline.enterprise_associate_by_email'
+ oauth_step = 'common.djangoapps.third_party_auth.pipeline.associate_by_email_if_oauth'
+ assert email_step in pipeline
+ assert pipeline.index(email_step) == pipeline.index(oauth_step) - 1
+
+ def test_inserts_logistration_step_after_associate_user(self):
+ """
+ handle_enterprise_logistration should be inserted immediately after
+ associate_user.
+ """
+ pipeline = self._base_pipeline()
+ settings = self._make_settings(pipeline=pipeline)
+ plugin_settings(settings)
+
+ logistration_step = 'enterprise.tpa_pipeline.handle_enterprise_logistration'
+ associate_step = 'social_core.pipeline.social_auth.associate_user'
+ assert logistration_step in pipeline
+ assert pipeline.index(logistration_step) == pipeline.index(associate_step) + 1
+
+ def test_idempotent(self):
+ """
+ Calling plugin_settings() twice should not duplicate pipeline entries.
+ """
+ pipeline = self._base_pipeline()
+ settings = self._make_settings(pipeline=pipeline)
+ plugin_settings(settings)
+ plugin_settings(settings)
+
+ email_step = 'enterprise.tpa_pipeline.enterprise_associate_by_email'
+ logistration_step = 'enterprise.tpa_pipeline.handle_enterprise_logistration'
+ assert pipeline.count(email_step) == 1
+ assert pipeline.count(logistration_step) == 1
+
+ def test_raises_when_oauth_reference_step_missing(self):
+ """
+ If the reference step (associate_by_email_if_oauth) is missing from
+ the pipeline, plugin_settings should raise ValueError.
+ """
+ pipeline = [
+ 'social_core.pipeline.social_auth.social_user',
+ 'social_core.pipeline.social_auth.associate_user',
+ ]
+ settings = self._make_settings(pipeline=pipeline)
+ with pytest.raises(ValueError):
+ plugin_settings(settings)
+
+ def test_raises_when_associate_user_reference_step_missing(self):
+ """
+ If the reference step (associate_user) is missing from the pipeline,
+ plugin_settings should raise ValueError.
+ """
+ pipeline = [
+ 'common.djangoapps.third_party_auth.pipeline.associate_by_email_if_oauth',
+ ]
+ settings = self._make_settings(pipeline=pipeline)
+ with pytest.raises(ValueError):
+ plugin_settings(settings)
+
+ def test_no_pipeline_attribute(self):
+ """
+ If settings has no SOCIAL_AUTH_PIPELINE, plugin_settings is a no-op.
+ """
+ settings = SimpleNamespace()
+ # Should not raise
+ plugin_settings(settings)
+
+ def test_pipeline_is_none(self):
+ """
+ If SOCIAL_AUTH_PIPELINE is None, plugin_settings is a no-op.
+ """
+ settings = self._make_settings(pipeline=None)
+ # Should not raise
+ plugin_settings(settings)
diff --git a/tests/test_tpa_pipeline.py b/tests/test_tpa_pipeline.py
index c1e7bda8e6..af5a63b9cf 100644
--- a/tests/test_tpa_pipeline.py
+++ b/tests/test_tpa_pipeline.py
@@ -11,9 +11,14 @@
from django.contrib.messages.storage import fallback
from django.contrib.sessions.backends import cache
from django.test import RequestFactory
+from django.test.utils import override_settings
from enterprise.models import EnterpriseCustomerUser
-from enterprise.tpa_pipeline import get_enterprise_customer_for_running_pipeline, handle_enterprise_logistration
+from enterprise.tpa_pipeline import (
+ enterprise_associate_by_email,
+ get_enterprise_customer_for_running_pipeline,
+ handle_enterprise_logistration,
+)
from test_utils.factories import (
EnterpriseCustomerFactory,
EnterpriseCustomerIdentityProviderFactory,
@@ -312,3 +317,216 @@ def test_enterprise_logistration_validates_sso_orchestration_config(self):
assert handle_enterprise_logistration.__wrapped__(backend, self.user) is None
customer_sso_integration_config.refresh_from_db()
assert customer_sso_integration_config.validated_at is not None
+
+
+@mark.django_db
+class TestEnterpriseAssociateByEmail(unittest.TestCase):
+ """
+ Tests for the enterprise_associate_by_email pipeline step.
+ """
+
+ def setUp(self):
+ super().setUp()
+ self.idp = EnterpriseCustomerIdentityProviderFactory(provider_id='test-provider-id')
+ self.enterprise_customer = self.idp.enterprise_customer
+ self.user = UserFactory(is_active=True, email='existing@example.com')
+ self.request_factory = RequestFactory()
+ # is_saml_provider is imported from openedx-platform and is None in the test env.
+ self.is_saml_provider_patcher = mock.patch('enterprise.tpa_pipeline.is_saml_provider')
+ self.mock_is_saml_provider = self.is_saml_provider_patcher.start()
+ self.saml_provider_mock = mock.MagicMock(provider_id='test-provider-id')
+ self.mock_is_saml_provider.return_value = (True, self.saml_provider_mock)
+
+ def tearDown(self):
+ self.is_saml_provider_patcher.stop()
+ super().tearDown()
+
+ def _make_strategy(self, provider_id='test-provider-id'):
+ """Return a mock strategy whose request.backend has the given provider_id."""
+ backend_mock = mock.MagicMock(provider_id=provider_id)
+ strategy_mock = mock.MagicMock()
+ strategy_mock.request.backend = backend_mock
+ return strategy_mock
+
+ def test_returns_none_when_user_already_set(self):
+ """
+ If the pipeline step already has a user, return None (no-op).
+ """
+ strategy = self._make_strategy()
+ result = enterprise_associate_by_email(
+ strategy=strategy,
+ details={'email': self.user.email},
+ user=self.user,
+ )
+ assert result is None
+
+ def test_returns_none_when_no_email(self):
+ """
+ If details has no email, return None.
+ """
+ strategy = self._make_strategy()
+ result = enterprise_associate_by_email(
+ strategy=strategy,
+ details={},
+ user=None,
+ )
+ assert result is None
+
+ def test_returns_none_when_no_matching_user(self):
+ """
+ If no user exists with the given email, return None.
+ """
+ strategy = self._make_strategy()
+ result = enterprise_associate_by_email(
+ strategy=strategy,
+ details={'email': 'nosuchuser@example.com'},
+ user=None,
+ )
+ assert result is None
+
+ def test_returns_none_when_no_provider_id(self):
+ """
+ If the SAML provider has no provider_id, return None.
+ """
+ self.saml_provider_mock.provider_id = None
+ strategy = self._make_strategy(provider_id=None)
+ result = enterprise_associate_by_email(
+ strategy=strategy,
+ details={'email': self.user.email},
+ user=None,
+ )
+ assert result is None
+
+ def test_returns_user_when_enterprise_customer_user_matches(self):
+ """
+ If the email matches an existing user who is linked to the enterprise for this provider,
+ delegate to social_core's associate_by_email and return the association response.
+ """
+ EnterpriseCustomerUser.objects.create(
+ enterprise_customer=self.enterprise_customer,
+ user_id=self.user.id,
+ )
+ strategy = self._make_strategy(provider_id='test-provider-id')
+ with mock.patch('enterprise.tpa_pipeline.associate_by_email') as mock_assoc:
+ mock_assoc.return_value = {'user': self.user}
+ result = enterprise_associate_by_email(
+ strategy=strategy,
+ details={'email': self.user.email},
+ user=None,
+ )
+ assert result == {'user': self.user}
+ mock_assoc.assert_called_once()
+
+ def test_returns_none_when_user_not_linked_to_enterprise(self):
+ """
+ If the email matches a user but that user is NOT an EnterpriseCustomerUser for the
+ provider's enterprise, return None.
+ """
+ # User exists but is not linked to the enterprise customer for this IdP.
+ strategy = self._make_strategy(provider_id='test-provider-id')
+ result = enterprise_associate_by_email(
+ strategy=strategy,
+ details={'email': self.user.email},
+ user=None,
+ )
+ assert result is None
+
+ def test_returns_none_when_user_is_inactive(self):
+ """
+ If the email matches an existing user who is inactive, return None
+ even if they are linked to the enterprise.
+ """
+ inactive_user = UserFactory(is_active=False, email='inactive@example.com')
+ EnterpriseCustomerUser.objects.create(
+ enterprise_customer=self.enterprise_customer,
+ user_id=inactive_user.id,
+ )
+ strategy = self._make_strategy(provider_id='test-provider-id')
+ with mock.patch('enterprise.tpa_pipeline.associate_by_email') as mock_assoc:
+ mock_assoc.return_value = {'user': inactive_user}
+ with mock.patch('enterprise.tpa_pipeline.log') as mock_log:
+ result = enterprise_associate_by_email(
+ strategy=strategy,
+ details={'email': 'inactive@example.com'},
+ user=None,
+ )
+ assert result is None
+ # Should log the inactive user info message.
+ log_messages = [call[0][0] for call in mock_log.info.call_args_list]
+ assert any(
+ '[Multiple_SSO_SAML_Accounts_Association_to_User] User association account is not'
+ in msg for msg in log_messages
+ )
+
+ def test_logs_exception_on_unexpected_error(self):
+ """
+ If an unexpected error occurs during enterprise user lookup, log the
+ exception and return None.
+ """
+ EnterpriseCustomerUser.objects.create(
+ enterprise_customer=self.enterprise_customer,
+ user_id=self.user.id,
+ )
+ strategy = self._make_strategy(provider_id='test-provider-id')
+ with mock.patch(
+ 'enterprise.tpa_pipeline.EnterpriseCustomerIdentityProvider.objects'
+ ) as mock_objects:
+ mock_objects.get.side_effect = RuntimeError('boom')
+ with mock.patch('enterprise.tpa_pipeline.log') as mock_log:
+ result = enterprise_associate_by_email(
+ strategy=strategy,
+ details={'email': self.user.email},
+ user=None,
+ )
+ assert result is None
+ mock_log.exception.assert_called_once()
+ assert '[Multiple_SSO_SAML_Accounts_Association_to_User]' in mock_log.exception.call_args[0][0]
+
+ @override_settings(FEATURES={'ENABLE_ENTERPRISE_INTEGRATION': False})
+ def test_returns_none_when_enterprise_disabled(self):
+ """
+ If ENABLE_ENTERPRISE_INTEGRATION is False, return None without
+ querying enterprise models.
+ """
+ strategy = self._make_strategy()
+ result = enterprise_associate_by_email(
+ strategy=strategy,
+ details={'email': self.user.email},
+ user=None,
+ )
+ assert result is None
+ # is_saml_provider should never be called when enterprise is disabled.
+ self.mock_is_saml_provider.assert_not_called()
+
+ def test_returns_none_when_associate_by_email_returns_none(self):
+ """
+ If the user is an enterprise customer user but associate_by_email
+ returns None (e.g. multiple users with same email), return None.
+ """
+ EnterpriseCustomerUser.objects.create(
+ enterprise_customer=self.enterprise_customer,
+ user_id=self.user.id,
+ )
+ strategy = self._make_strategy(provider_id='test-provider-id')
+ with mock.patch('enterprise.tpa_pipeline.associate_by_email') as mock_assoc:
+ mock_assoc.return_value = None
+ result = enterprise_associate_by_email(
+ strategy=strategy,
+ details={'email': self.user.email},
+ user=None,
+ )
+ assert result is None
+
+ def test_returns_none_for_non_saml_provider(self):
+ """
+ If the backend is not a SAML provider, return None without
+ querying enterprise models.
+ """
+ self.mock_is_saml_provider.return_value = (False, None)
+ strategy = self._make_strategy()
+ result = enterprise_associate_by_email(
+ strategy=strategy,
+ details={'email': self.user.email},
+ user=None,
+ )
+ assert result is None