diff --git a/docs/SUPPORT_GUIDE_SSO_ORPHANED_MAPPINGS.md b/docs/SUPPORT_GUIDE_SSO_ORPHANED_MAPPINGS.md new file mode 100644 index 000000000..64e87b0d7 --- /dev/null +++ b/docs/SUPPORT_GUIDE_SSO_ORPHANED_MAPPINGS.md @@ -0,0 +1,406 @@ +# Support Guide: Resolving Orphaned SSO Group Mappings + +**Issue Type:** Okta Push Groups / SCIM Group Provisioning Failures +**Error Pattern:** "stale externalId" or "orphaned mapping" +**Severity:** Medium - Blocks specific group from being provisioned +**Related:** LINTEST-425 + +## Quick Reference + +### Symptoms Checklist + +- [ ] Customer reports Okta Push Groups failing for a specific group +- [ ] Error message mentions "externalId" or "unable to get the group" +- [ ] Other groups with similar settings work fine +- [ ] Issue persists after unlinking/re-linking in Okta +- [ ] Issue persists after deleting and recreating the Atlan group + +If 3+ of these apply, this is likely an orphaned SSO mapping issue. + +### Quick Resolution (5 minutes) + +```bash +# 1. Set up environment +export ATLAN_BASE_URL="https://customer-tenant.atlan.com" +export ATLAN_API_KEY="" + +# 2. Run diagnostic +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta \ + --group-name + +# 3. If orphaned mapping found, clean it up +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode cleanup \ + --sso-alias okta + +# 4. Verify cleanup +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta \ + --group-name +``` + +## Detailed Resolution Steps + +### Step 1: Gather Information + +Collect the following from the customer: + +1. **Tenant Details:** + - Tenant URL (e.g., `https://apex.atlan.com`) + - Affected group name (e.g., `grpAtlanProdWorkflowAdmin`) + - SSO provider (Okta, Azure AD, JumpCloud, etc.) + +2. **Error Details:** + - Full error message from the SSO provider + - Screenshot of the error (if available) + - When did the issue start? + +3. **What Has Been Tried:** + - Have they unlinked/re-linked the group? + - Have they deleted and recreated the Atlan group? + - Are there any recent changes to SSO configuration? + +### Step 2: Run Diagnostics + +#### Option A: Using the Python SDK (Recommended) + +```bash +# Install pyatlan if not already available +pip install pyatlan + +# Set environment variables +export ATLAN_BASE_URL="https://customer-tenant.atlan.com" +export ATLAN_API_KEY="" + +# Run diagnostic for the specific group +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta \ + --group-name grpAtlanProdWorkflowAdmin +``` + +**Expected Output:** + +If orphaned mapping exists: +``` +šŸ”“ ORPHANED: mapping-id-123 + Mapping Name: group-id-old--1234567890 + Group Name: grpAtlanProdWorkflowAdmin + SSO Group: grpAtlanProdWorkflowAdmin + Reason: Group ID mismatch: mapping has 'group-id-old' but current group has 'group-id-new' +``` + +If no issues found: +``` +āœ… VALID: mapping-id-456 + Mapping Name: group-id-current--1234567891 + Group Name: grpAtlanProdWorkflowAdmin + SSO Group: grpAtlanProdWorkflowAdmin +``` + +#### Option B: Manual API Check + +```bash +# Get all SSO mappings +curl -X GET \ + "${ATLAN_BASE_URL}/api/service/auth/admin/realms/default/identity-provider/instances/okta/mappers" \ + -H "Authorization: Bearer ${ATLAN_API_KEY}" + +# Get all groups +curl -X GET \ + "${ATLAN_BASE_URL}/api/service/groups" \ + -H "Authorization: Bearer ${ATLAN_API_KEY}" +``` + +### Step 3: Clean Up Orphaned Mappings + +#### Automated Cleanup (Recommended) + +```bash +# Interactive cleanup (asks for confirmation) +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode cleanup \ + --sso-alias okta + +# Non-interactive (use with caution) +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode cleanup \ + --sso-alias okta \ + --non-interactive +``` + +#### Manual Cleanup + +If you prefer to delete specific mappings manually: + +```python +from pyatlan.client.atlan import AtlanClient + +client = AtlanClient() + +# Delete the orphaned mapping +client.sso.delete_group_mapping( + sso_alias="okta", + group_map_id="mapping-id-to-delete" +) + +print("āœ… Deleted orphaned mapping") +``` + +Or via REST API: + +```bash +curl -X POST \ + "${ATLAN_BASE_URL}/api/service/auth/admin/realms/default/identity-provider/instances/okta/mappers/${MAPPING_ID}/delete" \ + -H "Authorization: Bearer ${ATLAN_API_KEY}" +``` + +### Step 4: Verify and Re-link in Okta + +After cleaning up the orphaned mapping: + +1. **Verify the Atlan group exists:** + ```bash + python -c " + from pyatlan.client.atlan import AtlanClient + client = AtlanClient() + groups = client.group.get_by_name(alias='grpAtlanProdWorkflowAdmin') + if groups and groups.records: + print(f'āœ… Group exists: {groups.records[0].id}') + else: + print('āŒ Group not found - needs to be created') + " + ``` + +2. **In Okta:** + - Go to Applications → Atlan → Push Groups + - **Unlink** the affected group (do not delete in Okta) + - **Re-link** using "Link Group" (NOT "Create") + - Select the existing Atlan group from the dropdown + +3. **Test the push:** + - Save the push group configuration + - Trigger a push + - Verify the group members are synced + +### Step 5: Follow-Up + +1. **Run diagnostic again** to confirm no orphaned mappings remain: + ```bash + python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta + ``` + +2. **Check for other affected groups:** + - If this is a widespread issue, run diagnostics without `--group-name` + - Clean up all orphaned mappings + +3. **Document the incident:** + - Note the group name(s) affected + - Record the orphaned mapping ID(s) that were deleted + - Update the customer ticket with resolution details + +## Common Scenarios + +### Scenario 1: Group Deleted and Recreated + +**Situation:** Customer deleted an Atlan group and created a new one with the same name. + +**Problem:** The old SSO mapping still references the old group ID. + +**Solution:** +1. Run diagnostic to identify the orphaned mapping +2. Delete the orphaned mapping +3. In Okta, unlink and re-link the group + +### Scenario 2: SSO Configuration Changed + +**Situation:** Customer changed SSO provider (e.g., from Azure to Okta) or reconfigured SSO. + +**Problem:** Old mappings from the previous SSO configuration still exist. + +**Solution:** +1. Run diagnostic for BOTH SSO providers (old and new) +2. Clean up orphaned mappings from the old provider +3. Verify mappings for the new provider are correct + +### Scenario 3: Multiple Groups Affected + +**Situation:** Multiple groups are experiencing the same issue. + +**Problem:** Likely a systemic issue from a bulk group operation. + +**Solution:** +1. Run diagnostic for all groups: `--mode diagnose --sso-alias okta` +2. Use non-interactive cleanup: `--mode cleanup --sso-alias okta --non-interactive` +3. Have customer re-link all affected groups in Okta + +### Scenario 4: Recurring Issue + +**Situation:** The same group keeps getting orphaned mappings. + +**Problem:** Customer is likely using "Create" instead of "Link Group" in Okta, or there's an automation issue. + +**Solution:** +1. Clean up the orphaned mapping +2. **Educate the customer** on proper linking procedure +3. Check if any automation or scripts are incorrectly managing groups +4. Consider creating a monitoring alert for this specific group + +## Escalation Criteria + +Escalate to engineering if: + +- [ ] Diagnostic script fails to run or errors out +- [ ] Cleanup doesn't resolve the issue (orphaned mapping still present after deletion) +- [ ] Issue recurs immediately after cleanup +- [ ] No orphaned mappings found but the issue persists +- [ ] Backend logs show errors related to Keycloak or SCIM provisioning +- [ ] Multiple customers reporting the same issue + +## Prevention Recommendations + +Share these best practices with customers: + +### For Support Teams + +1. **Regular Audits:** + - Run diagnostic monthly for all SSO providers + - Clean up orphaned mappings proactively + +2. **Documentation:** + - Maintain a runbook for common SSO issues + - Document customer's SSO configuration + +3. **Monitoring:** + - Set up alerts for SSO provisioning failures + - Track orphaned mapping incidents + +### For Customers + +1. **Proper Group Deletion:** + - Before deleting a group, unlink it from SSO first + - Use the diagnostic script before bulk group operations + +2. **Correct Linking Procedure:** + - Always use "Link Group" (not "Create") when re-linking + - Verify the group exists in Atlan before linking + +3. **Testing:** + - Test group pushes in a non-production tenant first + - Verify membership sync after any SSO configuration changes + +## Technical Details + +### What Are Orphaned Mappings? + +An SSO group mapping (Keycloak Identity Provider Mapper) creates a link between: +- An Atlan group (identified by group ID) +- An SSO group (identified by group name in the SSO provider) + +When a group is deleted or recreated, the new group gets a new ID, but the old mapping still references the old ID. This is an "orphaned" mapping. + +### Why Does Okta Fail? + +When Okta tries to push a group: +1. Okta sends a SCIM request with the group's `externalId` +2. Atlan/Keycloak looks up the group by `externalId` +3. If the mapping is orphaned, Keycloak can't find the group +4. The push fails with "unable to get the group" error + +### What Does Cleanup Do? + +The cleanup process: +1. Identifies mappings that reference non-existent groups +2. Deletes these orphaned mappings from Keycloak +3. Allows Okta to create a fresh mapping on the next push + +## API Reference + +### Diagnostic Script + +``` +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings [OPTIONS] + +Options: + --mode {diagnose,cleanup,list} Operation mode (required) + --sso-alias TEXT SSO provider alias (required) + --group-name TEXT Specific group to check (optional) + --non-interactive Skip prompts in cleanup mode +``` + +### SSO Client Methods + +```python +from pyatlan.client.atlan import AtlanClient + +client = AtlanClient() + +# Get all SSO group mappings +mappings = client.sso.get_all_group_mappings(sso_alias="okta") + +# Get specific mapping +mapping = client.sso.get_group_mapping( + sso_alias="okta", + group_map_id="mapping-id" +) + +# Delete mapping +client.sso.delete_group_mapping( + sso_alias="okta", + group_map_id="mapping-id" +) + +# Create new mapping +group = client.group.get_by_name(alias="MyGroup").records[0] +new_mapping = client.sso.create_group_mapping( + sso_alias="okta", + atlan_group=group, + sso_group_name="okta-group-name" +) +``` + +## Troubleshooting the Diagnostic Script + +### Script Won't Run + +**Problem:** `No module named pyatlan` + +**Solution:** +```bash +pip install pyatlan +# or +pip install --upgrade pyatlan +``` + +### Authentication Errors + +**Problem:** `401 Unauthorized` or `403 Forbidden` + +**Solution:** +- Verify `ATLAN_BASE_URL` is correct (include `https://`) +- Verify `ATLAN_API_KEY` has admin permissions +- Check API key hasn't expired + +### No Orphaned Mappings Found + +**Problem:** Diagnostic shows all mappings as valid, but issue persists + +**Possible Causes:** +- Wrong SSO alias (check if it's `okta`, `azure`, `jumpcloud`, etc.) +- Wrong group name (check exact name including capitalization) +- Issue is not related to orphaned mappings (escalate) + +## Related Documentation + +- [SSO Troubleshooting Guide](./sso_troubleshooting.md) +- [SSO Client API](./client/sso.rst) +- [Group Management API](./client/group.rst) + +## Changelog + +- **2025-02-11:** Initial support guide created for LINTEST-425 diff --git a/docs/sso_troubleshooting.md b/docs/sso_troubleshooting.md new file mode 100644 index 000000000..91740913f --- /dev/null +++ b/docs/sso_troubleshooting.md @@ -0,0 +1,400 @@ +# SSO Group Mapping Troubleshooting Guide + +## Overview + +This guide helps you diagnose and resolve issues with SSO (Single Sign-On) group mappings, particularly when using Okta Push Groups or similar SCIM-based group provisioning systems. + +## Common Issue: Stale externalId / Orphaned Mapping + +### Symptoms + +When attempting to push a group from your SSO provider (e.g., Okta) to Atlan, you receive an error similar to: + +``` +Failed on XX-XX-XXXX XX:XX:XX PM UTC: Unable to update Group Push mapping target App group +``: Error while trying to get the group `` with the externalId +`` and id `com.saasure.db.dto.platform.entity.AppGroup@...` +``` + +### Key Indicators + +- āœ… Other groups with similar settings push successfully +- āœ… The error consistently mentions a specific `externalId` +- āœ… The issue persists even after: + - Removing the SSO group mapping in Atlan + - Creating a new Atlan group with a matching name + - Unlinking and re-linking the group from Okta + +### Root Cause + +This issue occurs when there is a **stale mapping** between your SSO provider (Okta) and Atlan. Specifically: + +1. An SSO group mapping was previously created, establishing a link between an Okta group and an Atlan group +2. The Atlan group was deleted or recreated, or the SSO mapping was removed +3. However, Okta still has a record of the old `externalId` for that group +4. When Okta tries to push the group again, it uses the old `externalId` which Atlan cannot resolve + +The SSO group mapping (identity provider mapper in Keycloak) is stored separately from the group itself, and deleting one doesn't automatically clean up the other. + +## Diagnosis + +### Using the Diagnostic Script + +We provide a diagnostic utility to identify orphaned SSO group mappings. This script is part of the Python SDK. + +#### Prerequisites + +```bash +# Install the Atlan Python SDK +pip install pyatlan + +# Set up environment variables +export ATLAN_BASE_URL="https://your-tenant.atlan.com" +export ATLAN_API_KEY="your-api-key" +``` + +#### Diagnose All Mappings + +To check for orphaned mappings across all groups: + +```bash +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta +``` + +#### Diagnose Specific Group + +To check a specific group that's failing: + +```bash +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta \ + --group-name grpAtlanProdWorkflowAdmin +``` + +#### List All SSO Mappings + +To see all current SSO group mappings: + +```bash +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode list \ + --sso-alias okta +``` + +### Manual Diagnosis + +If you prefer to diagnose manually using the Python SDK: + +```python +from pyatlan.client.atlan import AtlanClient + +# Initialize client +client = AtlanClient() + +# Get all SSO group mappings +sso_alias = "okta" # or "azure", "jumpcloud", etc. +mappings = client.sso.get_all_group_mappings(sso_alias=sso_alias) + +# Get all Atlan groups +groups_response = client.group.get_all() +groups = {g.id: g for g in groups_response if g.id} + +# Check each mapping +for mapping in mappings: + if not mapping.name or not mapping.config: + continue + + # Extract group ID from mapping name (format: --) + mapping_group_id = mapping.name.split("--")[0] if "--" in mapping.name else None + + # Check if group still exists + if mapping_group_id and mapping_group_id not in groups: + print(f"āš ļø ORPHANED: Mapping {mapping.id} references non-existent group {mapping_group_id}") + print(f" SSO Group: {mapping.config.attribute_value}") + print(f" Atlan Group: {mapping.config.group_name}") +``` + +## Resolution + +### Option 1: Automated Cleanup (Recommended) + +Use the diagnostic script to automatically clean up orphaned mappings: + +```bash +# Interactive cleanup (prompts for confirmation for each mapping) +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode cleanup \ + --sso-alias okta + +# Non-interactive cleanup (automatically deletes all orphaned mappings) +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode cleanup \ + --sso-alias okta \ + --non-interactive +``` + +**āš ļø Warning:** Non-interactive mode will delete ALL orphaned mappings without confirmation. Use with caution! + +### Option 2: Manual Cleanup + +#### Using the Python SDK + +```python +from pyatlan.client.atlan import AtlanClient + +client = AtlanClient() + +# Find the orphaned mapping ID (from diagnosis step) +orphaned_mapping_id = "your-mapping-id-here" +sso_alias = "okta" + +# Delete the orphaned mapping +client.sso.delete_group_mapping( + sso_alias=sso_alias, + group_map_id=orphaned_mapping_id +) + +print(f"āœ… Deleted orphaned mapping {orphaned_mapping_id}") +``` + +#### Using the REST API + +If you need to clean up mappings directly via API: + +```bash +# Get your API token and base URL +ATLAN_BASE_URL="https://your-tenant.atlan.com" +ATLAN_API_KEY="your-api-key" +SSO_ALIAS="okta" +MAPPING_ID="mapping-id-to-delete" + +# Delete the mapping +curl -X POST \ + "${ATLAN_BASE_URL}/api/service/auth/admin/realms/default/identity-provider/instances/${SSO_ALIAS}/mappers/${MAPPING_ID}/delete" \ + -H "Authorization: Bearer ${ATLAN_API_KEY}" \ + -H "Content-Type: application/json" +``` + +### Post-Cleanup Steps + +After cleaning up the orphaned mappings: + +1. **In Atlan:** + - Verify the Atlan group exists: Settings → Access → Groups + - If needed, create the group: Settings → Access → Groups → + New Group + +2. **In Okta:** + - Go to Applications → Atlan → Push Groups + - **Unlink** the affected group (do not delete in Okta) + - **Re-link** the group using "Link Group" (NOT "Create") + - Choose the existing Atlan group from the dropdown + +3. **Test:** + - Try pushing the group again + - The error should now be resolved + - Verify group membership is correctly synced + +## Prevention + +To prevent this issue in the future: + +### 1. Proper Group Deletion Workflow + +When deleting a group that has SSO mapping: + +```python +from pyatlan.client.atlan import AtlanClient + +client = AtlanClient() +sso_alias = "okta" +group_id = "group-guid-here" + +# Step 1: Find and delete SSO mappings first +all_mappings = client.sso.get_all_group_mappings(sso_alias=sso_alias) +for mapping in all_mappings: + if mapping.name and group_id in mapping.name: + print(f"Deleting SSO mapping: {mapping.id}") + client.sso.delete_group_mapping( + sso_alias=sso_alias, + group_map_id=mapping.id + ) + +# Step 2: Then delete the group +client.group.purge(guid=group_id) +print(f"āœ… Group and SSO mappings deleted successfully") +``` + +### 2. Re-linking Groups + +When re-linking a group after unlinking: + +- āœ… **DO:** Use "Link Group" and select the existing Atlan group +- āŒ **DON'T:** Use "Create" which creates a new group and new mapping +- āŒ **DON'T:** Delete and recreate the Atlan group without cleaning up SSO mappings + +### 3. Regular Audits + +Periodically run the diagnostic script to identify orphaned mappings: + +```bash +# Add to your maintenance routine +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta +``` + +## Advanced Scenarios + +### Multiple SSO Providers + +If you have multiple SSO providers (e.g., Okta and Azure AD), run diagnostics for each: + +```bash +for sso in okta azure jumpcloud; do + echo "Checking $sso..." + python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias $sso +done +``` + +### Scripted Cleanup + +For large-scale cleanups across multiple tenants: + +```python +from pyatlan.client.atlan import AtlanClient +from pyatlan.samples.sso.diagnose_orphaned_group_mappings import SSOGroupMappingDiagnostic + +# Initialize +client = AtlanClient() +diagnostic = SSOGroupMappingDiagnostic(client) + +# Run diagnosis +results = diagnostic.diagnose_orphaned_mappings(sso_alias="okta") + +# Automated cleanup (be careful!) +if results["orphaned"]: + print(f"Found {len(results['orphaned'])} orphaned mappings") + deleted = diagnostic.cleanup_orphaned_mappings( + sso_alias="okta", + interactive=False # Automatic deletion + ) + print(f"Deleted {deleted} mappings") +``` + +### Monitoring and Alerting + +Set up monitoring to detect this issue early: + +```python +import logging +from pyatlan.client.atlan import AtlanClient +from pyatlan.samples.sso.diagnose_orphaned_group_mappings import SSOGroupMappingDiagnostic + +def check_orphaned_mappings(sso_alias: str) -> bool: + """ + Check for orphaned SSO mappings and return True if any found. + Suitable for integration with monitoring systems. + """ + client = AtlanClient() + diagnostic = SSOGroupMappingDiagnostic(client) + + results = diagnostic.diagnose_orphaned_mappings(sso_alias=sso_alias) + + orphaned_count = len(results["orphaned"]) + + if orphaned_count > 0: + logging.warning(f"Found {orphaned_count} orphaned SSO mappings for {sso_alias}") + return True + + return False + +# Usage in monitoring script +if check_orphaned_mappings("okta"): + # Send alert to your monitoring system + # e.g., send_pagerduty_alert(), send_slack_message(), etc. + pass +``` + +## API Reference + +### SSO Client Methods + +All SSO operations are available through `client.sso`: + +```python +from pyatlan.client.atlan import AtlanClient +from pyatlan.model.group import AtlanGroup + +client = AtlanClient() + +# Create a new SSO group mapping +group = client.group.get_by_name(alias="MyGroup").records[0] +mapping = client.sso.create_group_mapping( + sso_alias="okta", + atlan_group=group, + sso_group_name="okta-group-name" +) + +# Get all SSO group mappings +all_mappings = client.sso.get_all_group_mappings(sso_alias="okta") + +# Get specific SSO group mapping +mapping = client.sso.get_group_mapping( + sso_alias="okta", + group_map_id="mapping-id" +) + +# Update SSO group mapping +updated = client.sso.update_group_mapping( + sso_alias="okta", + atlan_group=group, + group_map_id="mapping-id", + sso_group_name="new-okta-group-name" +) + +# Delete SSO group mapping +client.sso.delete_group_mapping( + sso_alias="okta", + group_map_id="mapping-id" +) +``` + +## Support Escalation + +If the above steps don't resolve your issue, escalate to Atlan Support with: + +1. **Environment Details:** + - Tenant URL + - SSO provider (Okta, Azure AD, etc.) + - Affected group name(s) + +2. **Diagnostic Output:** + ```bash + python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta > diagnostic_output.txt + ``` + +3. **Error Messages:** + - Full error message from Okta/SSO provider + - Any relevant logs from Atlan + +4. **Steps Already Taken:** + - List what you've already tried + - Whether cleanup script was run + - Results of cleanup attempts + +## Related Documentation + +- [SSO Group Mappings API](../client/sso.rst) +- [Group Management API](../client/group.rst) +- [Atlan SSO Configuration](https://ask.atlan.com/hc/en-us/articles/sso-setup) + +## Changelog + +- **2025-02-11:** Initial documentation for LINTEST-425 (Okta Push Groups stale externalId issue) diff --git a/pyatlan/client/aio/sso.py b/pyatlan/client/aio/sso.py index d68f4e04f..5e92e9e98 100644 --- a/pyatlan/client/aio/sso.py +++ b/pyatlan/client/aio/sso.py @@ -3,6 +3,7 @@ from __future__ import annotations +import logging from typing import TYPE_CHECKING, List from pydantic.v1 import validate_arguments @@ -23,6 +24,8 @@ if TYPE_CHECKING: pass +logger = logging.getLogger(__name__) + class AsyncSSOClient: """ @@ -63,14 +66,35 @@ async def create_group_mapping( :param atlan_group: existing Atlan group. :param sso_group_name: name of the SSO group. :raises AtlanError: on any error during API invocation. + :raises InvalidRequestError: if the group doesn't have required fields or mapping already exists. :returns: created SSO group mapping instance. """ + # Validate the group has required fields + if not atlan_group.id: + raise ErrorCode.SSO_MAPPING_VALIDATION_ERROR.exception_with_parameters( + "Atlan group must have an 'id' field populated", + sso_alias, + ) + if not atlan_group.name: + raise ErrorCode.SSO_MAPPING_VALIDATION_ERROR.exception_with_parameters( + "Atlan group must have a 'name' field populated", + sso_alias, + ) + + logger.info( + f"Creating SSO group mapping: {atlan_group.alias} (ID: {atlan_group.id}) " + f"<-> {sso_group_name} (SSO: {sso_alias})" + ) + await self._check_existing_group_mappings(sso_alias, atlan_group) endpoint, request_obj = SSOCreateGroupMapping.prepare_request( sso_alias, atlan_group, sso_group_name ) raw_json = await self._client._call_api(endpoint, request_obj=request_obj) - return SSOCreateGroupMapping.process_response(raw_json) + result = SSOCreateGroupMapping.process_response(raw_json) + + logger.info(f"Successfully created SSO group mapping with ID: {result.id}") + return result @validate_arguments async def update_group_mapping( @@ -130,13 +154,28 @@ async def delete_group_mapping(self, sso_alias: str, group_map_id: str) -> None: """ Deletes an existing Atlan SSO group mapping. + Note: This only deletes the SSO mapping (identity provider mapper). + If you're experiencing issues with Okta Push Groups failing with + stale externalId errors, you may need to run the diagnostic script + to identify and clean up orphaned mappings. + :param sso_alias: name of the SSO provider. :param group_map_id: existing SSO group map identifier. :raises AtlanError: on any error during API invocation. :returns: an empty response (`None`). """ + logger.info(f"Deleting SSO group mapping: {group_map_id} (SSO: {sso_alias})") + endpoint, request_obj = SSODeleteGroupMapping.prepare_request( sso_alias, group_map_id ) raw_json = await self._client._call_api(endpoint, request_obj=request_obj) + + logger.info(f"Successfully deleted SSO group mapping: {group_map_id}") + logger.debug( + "If you're experiencing Okta Push Groups issues with stale externalId, " + "run: python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings " + f"--mode diagnose --sso-alias {sso_alias}" + ) + return raw_json diff --git a/pyatlan/client/sso.py b/pyatlan/client/sso.py index 5c96f23ba..9bbcbf260 100644 --- a/pyatlan/client/sso.py +++ b/pyatlan/client/sso.py @@ -1,3 +1,4 @@ +import logging from typing import List from pydantic.v1 import validate_arguments @@ -15,6 +16,8 @@ from pyatlan.model.group import AtlanGroup from pyatlan.model.sso import SSOMapper +logger = logging.getLogger(__name__) + class SSOClient: """ @@ -55,14 +58,35 @@ def create_group_mapping( :param atlan_group: existing Atlan group. :param sso_group_name: name of the SSO group. :raises AtlanError: on any error during API invocation. + :raises InvalidRequestError: if the group doesn't have required fields or mapping already exists. :returns: created SSO group mapping instance. """ + # Validate the group has required fields + if not atlan_group.id: + raise ErrorCode.SSO_MAPPING_VALIDATION_ERROR.exception_with_parameters( + "Atlan group must have an 'id' field populated", + sso_alias, + ) + if not atlan_group.name: + raise ErrorCode.SSO_MAPPING_VALIDATION_ERROR.exception_with_parameters( + "Atlan group must have a 'name' field populated", + sso_alias, + ) + + logger.info( + f"Creating SSO group mapping: {atlan_group.alias} (ID: {atlan_group.id}) " + f"<-> {sso_group_name} (SSO: {sso_alias})" + ) + self._check_existing_group_mappings(sso_alias, atlan_group) endpoint, request_obj = SSOCreateGroupMapping.prepare_request( sso_alias, atlan_group, sso_group_name ) raw_json = self._client._call_api(endpoint, request_obj=request_obj) - return SSOCreateGroupMapping.process_response(raw_json) + result = SSOCreateGroupMapping.process_response(raw_json) + + logger.info(f"Successfully created SSO group mapping with ID: {result.id}") + return result @validate_arguments def update_group_mapping( @@ -122,13 +146,28 @@ def delete_group_mapping(self, sso_alias: str, group_map_id: str) -> None: """ Deletes an existing Atlan SSO group mapping. + Note: This only deletes the SSO mapping (identity provider mapper). + If you're experiencing issues with Okta Push Groups failing with + stale externalId errors, you may need to run the diagnostic script + to identify and clean up orphaned mappings. + :param sso_alias: name of the SSO provider. :param group_map_id: existing SSO group map identifier. :raises AtlanError: on any error during API invocation. :returns: an empty response (`None`). """ + logger.info(f"Deleting SSO group mapping: {group_map_id} (SSO: {sso_alias})") + endpoint, request_obj = SSODeleteGroupMapping.prepare_request( sso_alias, group_map_id ) raw_json = self._client._call_api(endpoint, request_obj=request_obj) + + logger.info(f"Successfully deleted SSO group mapping: {group_map_id}") + logger.debug( + "If you're experiencing Okta Push Groups issues with stale externalId, " + "run: python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings " + f"--mode diagnose --sso-alias {sso_alias}" + ) + return raw_json diff --git a/pyatlan/errors.py b/pyatlan/errors.py index 9fd801a30..bd3bd7544 100644 --- a/pyatlan/errors.py +++ b/pyatlan/errors.py @@ -539,6 +539,24 @@ class ErrorCode(Enum): "You can use SSOClient.update_group_mapping() to update the existing group mapping.", InvalidRequestError, ) + SSO_GROUP_NOT_FOUND = ( + 404, + "ATLAN-PYTHON-404-031", + "Atlan group '{0}' (ID: {1}) not found. This may indicate an orphaned SSO group mapping.", + "The SSO mapping references a group that no longer exists. " + "Run the diagnostic script to identify and clean up orphaned mappings: " + "python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings --mode diagnose --sso-alias {2}", + NotFoundError, + ) + SSO_MAPPING_VALIDATION_ERROR = ( + 400, + "ATLAN-PYTHON-400-059", + "SSO mapping validation failed: {0}", + "Ensure the Atlan group exists before creating an SSO mapping. " + "You may have orphaned mappings that need cleanup. " + "Run: python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings --mode diagnose --sso-alias {1}", + InvalidRequestError, + ) INVALID_UPLOAD_FILE_PATH = ( 400, "ATLAN-PYTHON-400-059", diff --git a/pyatlan/samples/sso/README.md b/pyatlan/samples/sso/README.md new file mode 100644 index 000000000..b4e782b39 --- /dev/null +++ b/pyatlan/samples/sso/README.md @@ -0,0 +1,89 @@ +# SSO Group Mapping Utilities + +This directory contains utilities for managing SSO (Single Sign-On) group mappings in Atlan. + +## Available Tools + +### Diagnose Orphaned Group Mappings + +**File:** `diagnose_orphaned_group_mappings.py` + +A diagnostic and cleanup utility for identifying and resolving orphaned SSO group mappings that can cause Okta Push Groups to fail with stale `externalId` errors. + +#### Quick Start + +```bash +# Set up environment +export ATLAN_BASE_URL="https://your-tenant.atlan.com" +export ATLAN_API_KEY="your-api-key" + +# Diagnose all SSO group mappings +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta + +# Diagnose a specific group +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta \ + --group-name grpAtlanProdWorkflowAdmin + +# Clean up orphaned mappings (interactive) +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode cleanup \ + --sso-alias okta + +# List all SSO group mappings +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode list \ + --sso-alias okta +``` + +#### Common Use Cases + +1. **Okta Push Groups Failing with Stale externalId** + - Run diagnostic to identify orphaned mappings + - Clean up orphaned mappings + - In Okta, unlink and re-link the group (use "Link Group", not "Create") + +2. **Regular Maintenance** + - Periodically run diagnostic to catch orphaned mappings early + - Schedule as part of your regular Atlan maintenance routine + +3. **Group Migration or Cleanup** + - Before deleting groups, check for associated SSO mappings + - Clean up mappings before deleting the group to avoid orphans + +#### Options + +- `--mode`: Operation mode + - `diagnose`: Check for orphaned mappings (recommended first step) + - `cleanup`: Remove orphaned mappings (interactive by default) + - `list`: Display all SSO group mappings + +- `--sso-alias`: SSO provider alias (e.g., `okta`, `azure`, `jumpcloud`) + +- `--group-name`: (Optional) Check only a specific group + +- `--non-interactive`: Run cleanup without prompting (use with caution!) + +## Documentation + +For detailed information about SSO group mapping troubleshooting, see: +- [SSO Troubleshooting Guide](../../../docs/sso_troubleshooting.md) +- [SSO Client API Documentation](../../../docs/client/sso.rst) + +## Support + +If you encounter issues or need help: + +1. Run the diagnostic script and save the output +2. Check the [SSO Troubleshooting Guide](../../../docs/sso_troubleshooting.md) +3. If the issue persists, contact Atlan Support with: + - Diagnostic output + - Full error messages + - Steps you've already tried + +## Related Issues + +- **LINTEST-425**: Okta Push Groups stale externalId / orphaned mapping issue diff --git a/pyatlan/samples/sso/__init__.py b/pyatlan/samples/sso/__init__.py new file mode 100644 index 000000000..746515783 --- /dev/null +++ b/pyatlan/samples/sso/__init__.py @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2025 Atlan Pte. Ltd. +""" +SSO (Single Sign-On) utilities for Atlan. + +This module provides utilities for managing and troubleshooting +SSO group mappings in Atlan. +""" diff --git a/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py b/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py new file mode 100644 index 000000000..28e308f97 --- /dev/null +++ b/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py @@ -0,0 +1,398 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2025 Atlan Pte. Ltd. +""" +Diagnostic and cleanup utility for orphaned SSO group mappings. + +This script helps diagnose and resolve issues where Okta Push Groups fails +with stale externalId errors. This typically happens when: +1. An SSO group mapping is deleted but Okta still references the old externalId +2. A group is recreated with the same name but Okta pushes with old externalId +3. Unlinking and re-linking groups doesn't clear the orphaned mapping + +Usage: + # Diagnose orphaned mappings + python diagnose_orphaned_group_mappings.py --mode diagnose --sso-alias + + # Diagnose specific group + python diagnose_orphaned_group_mappings.py --mode diagnose \\ + --sso-alias --group-name + + # Clean up orphaned mappings (interactive) + python diagnose_orphaned_group_mappings.py --mode cleanup --sso-alias + + # List all SSO group mappings + python diagnose_orphaned_group_mappings.py --mode list --sso-alias + +Environment Variables: + ATLAN_BASE_URL: Base URL of your Atlan tenant (e.g., https://apex.atlan.com) + ATLAN_API_KEY: API key for authentication +""" + +import argparse +import logging +import sys +from typing import Dict, List, Optional + +from pyatlan.client.atlan import AtlanClient +from pyatlan.errors import AtlanError +from pyatlan.model.group import AtlanGroup +from pyatlan.model.sso import SSOMapper + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + + +class SSOGroupMappingDiagnostic: + """Utility to diagnose and clean up orphaned SSO group mappings.""" + + def __init__(self, client: AtlanClient): + self.client = client + + def get_all_groups(self) -> Dict[str, AtlanGroup]: + """ + Retrieve all Atlan groups. + + :returns: Dictionary mapping group ID to AtlanGroup + """ + logger.info("Retrieving all Atlan groups...") + groups = {} + + try: + response = self.client.group.get_all(limit=100) + for group in response: + if group.id: + groups[group.id] = group + logger.info(f"Found {len(groups)} Atlan groups") + except AtlanError as e: + logger.error(f"Error retrieving groups: {e}") + raise + + return groups + + def get_all_sso_mappings(self, sso_alias: str) -> List[SSOMapper]: + """ + Retrieve all SSO group mappings for a given SSO provider. + + :param sso_alias: SSO provider alias (e.g., 'okta', 'azure', 'jumpcloud') + :returns: List of SSO group mappings + """ + logger.info(f"Retrieving SSO group mappings for '{sso_alias}'...") + + try: + mappings = self.client.sso.get_all_group_mappings(sso_alias=sso_alias) + logger.info(f"Found {len(mappings)} SSO group mappings") + return mappings + except AtlanError as e: + logger.error(f"Error retrieving SSO mappings: {e}") + raise + + def diagnose_orphaned_mappings( + self, sso_alias: str, target_group_name: Optional[str] = None + ) -> Dict[str, List[SSOMapper]]: + """ + Diagnose orphaned SSO group mappings. + + An orphaned mapping is one where: + - The mapping references a group ID that doesn't exist + - The mapping's group name doesn't match the actual group name + + :param sso_alias: SSO provider alias + :param target_group_name: Optional specific group name to check + :returns: Dictionary with 'orphaned' and 'valid' mapping lists + """ + logger.info("=" * 80) + logger.info("DIAGNOSING SSO GROUP MAPPINGS") + logger.info("=" * 80) + + groups = self.get_all_groups() + mappings = self.get_all_sso_mappings(sso_alias) + + # Create reverse lookup: group name -> group ID + group_name_to_id = {g.name: g.id for g in groups.values() if g.name} + + orphaned = [] + valid = [] + suspicious = [] + + logger.info("\nAnalyzing mappings...") + logger.info("-" * 80) + + for mapping in mappings: + if ( + not mapping.name + or not mapping.config + or not mapping.config.group_name + ): + logger.warning(f"āš ļø Mapping {mapping.id} has incomplete data") + suspicious.append(mapping) + continue + + # Extract group ID from mapping name (format: --) + mapping_group_id = ( + mapping.name.split("--")[0] if "--" in mapping.name else None + ) + mapped_group_name = mapping.config.group_name + + # Skip if target group specified and this isn't it + if target_group_name and mapped_group_name != target_group_name: + continue + + # Check if this is orphaned + is_orphaned = False + orphan_reason = [] + + # Case 1: Group ID in mapping doesn't exist + if mapping_group_id and mapping_group_id not in groups: + is_orphaned = True + orphan_reason.append(f"Group ID '{mapping_group_id}' not found") + + # Case 2: Group name doesn't match any existing group + if mapped_group_name not in group_name_to_id: + is_orphaned = True + orphan_reason.append(f"Group name '{mapped_group_name}' not found") + + # Case 3: Group ID in mapping doesn't match current group with that name + elif ( + mapping_group_id + and group_name_to_id.get(mapped_group_name) != mapping_group_id + ): + is_orphaned = True + current_id = group_name_to_id.get(mapped_group_name) + orphan_reason.append( + f"Group ID mismatch: mapping has '{mapping_group_id}' " + f"but current group has '{current_id}'" + ) + + if is_orphaned: + orphaned.append(mapping) + logger.warning(f"šŸ”“ ORPHANED: {mapping.id}") + logger.warning(f" Mapping Name: {mapping.name}") + logger.warning(f" Group Name: {mapped_group_name}") + logger.warning(f" SSO Group: {mapping.config.attribute_value}") + logger.warning(f" Reason: {', '.join(orphan_reason)}") + logger.warning("") + else: + valid.append(mapping) + if target_group_name: + logger.info(f"āœ… VALID: {mapping.id}") + logger.info(f" Mapping Name: {mapping.name}") + logger.info(f" Group Name: {mapped_group_name}") + logger.info(f" SSO Group: {mapping.config.attribute_value}") + logger.info("") + + logger.info("=" * 80) + logger.info("SUMMARY") + logger.info("=" * 80) + logger.info(f"Total mappings: {len(mappings)}") + logger.info(f"āœ… Valid mappings: {len(valid)}") + logger.info(f"šŸ”“ Orphaned mappings: {len(orphaned)}") + logger.info(f"āš ļø Suspicious mappings: {len(suspicious)}") + logger.info("=" * 80) + + return { + "orphaned": orphaned, + "valid": valid, + "suspicious": suspicious, + } + + def list_all_mappings(self, sso_alias: str) -> None: + """ + List all SSO group mappings in a readable format. + + :param sso_alias: SSO provider alias + """ + logger.info("=" * 80) + logger.info(f"SSO GROUP MAPPINGS FOR '{sso_alias}'") + logger.info("=" * 80) + + mappings = self.get_all_sso_mappings(sso_alias) + + if not mappings: + logger.info("No SSO group mappings found") + return + + for i, mapping in enumerate(mappings, 1): + logger.info(f"\n{i}. Mapping ID: {mapping.id}") + logger.info(f" Mapping Name: {mapping.name}") + if mapping.config: + logger.info(f" Atlan Group: {mapping.config.group_name}") + logger.info(f" SSO Group: {mapping.config.attribute_value}") + logger.info(f" Sync Mode: {mapping.config.sync_mode}") + logger.info(f" Provider: {mapping.identity_provider_alias}") + logger.info(f" Mapper Type: {mapping.identity_provider_mapper}") + + logger.info("\n" + "=" * 80) + + def cleanup_orphaned_mappings( + self, sso_alias: str, interactive: bool = True + ) -> int: + """ + Clean up orphaned SSO group mappings. + + :param sso_alias: SSO provider alias + :param interactive: If True, ask for confirmation before each deletion + :returns: Number of mappings deleted + """ + logger.info("=" * 80) + logger.info("CLEANUP MODE") + logger.info("=" * 80) + + results = self.diagnose_orphaned_mappings(sso_alias) + orphaned = results["orphaned"] + + if not orphaned: + logger.info("\nāœ… No orphaned mappings found. Nothing to clean up!") + return 0 + + logger.warning(f"\nāš ļø Found {len(orphaned)} orphaned mapping(s) to clean up") + + deleted_count = 0 + + for mapping in orphaned: + if not mapping.id: + logger.warning("Skipping mapping with no ID") + continue + + if interactive: + logger.info("\n" + "-" * 80) + logger.info(f"Mapping ID: {mapping.id}") + logger.info(f"Mapping Name: {mapping.name}") + if mapping.config: + logger.info(f"Group Name: {mapping.config.group_name}") + logger.info(f"SSO Group: {mapping.config.attribute_value}") + + response = input("\nDelete this mapping? (y/n): ").strip().lower() + + if response != "y": + logger.info("Skipped") + continue + + try: + logger.info(f"Deleting mapping {mapping.id}...") + self.client.sso.delete_group_mapping( + sso_alias=sso_alias, + group_map_id=mapping.id, + ) + logger.info(f"āœ… Successfully deleted mapping {mapping.id}") + deleted_count += 1 + except AtlanError as e: + logger.error(f"āŒ Error deleting mapping {mapping.id}: {e}") + + logger.info("\n" + "=" * 80) + logger.info(f"Cleanup complete. Deleted {deleted_count} orphaned mapping(s)") + logger.info("=" * 80) + + return deleted_count + + +def main(): + """Main entry point for the diagnostic script.""" + parser = argparse.ArgumentParser( + description="Diagnose and clean up orphaned SSO group mappings", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + + parser.add_argument( + "--mode", + required=True, + choices=["diagnose", "cleanup", "list"], + help=( + "Operation mode: diagnose (check for issues), " + "cleanup (remove orphaned mappings), or list (show all mappings)" + ), + ) + + parser.add_argument( + "--sso-alias", + required=True, + help="SSO provider alias (e.g., 'okta', 'azure', 'jumpcloud')", + ) + + parser.add_argument( + "--group-name", + help="Optional: Specific Atlan group name to diagnose", + ) + + parser.add_argument( + "--non-interactive", + action="store_true", + help="Run cleanup without prompting for confirmation (dangerous!)", + ) + + args = parser.parse_args() + + # Initialize client + try: + client = AtlanClient() + logger.info(f"Connected to Atlan at {client.base_url}") + except Exception as e: + logger.error(f"Failed to initialize Atlan client: {e}") + logger.error("Make sure ATLAN_BASE_URL and ATLAN_API_KEY are set") + sys.exit(1) + + # Initialize diagnostic tool + diagnostic = SSOGroupMappingDiagnostic(client) + + # Execute requested mode + try: + if args.mode == "list": + diagnostic.list_all_mappings(args.sso_alias) + + elif args.mode == "diagnose": + results = diagnostic.diagnose_orphaned_mappings( + sso_alias=args.sso_alias, + target_group_name=args.group_name, + ) + + if results["orphaned"]: + logger.info( + "\nšŸ’” TIP: Run with --mode cleanup to remove orphaned mappings" + ) + sys.exit(1) # Exit with error code if orphaned mappings found + + elif args.mode == "cleanup": + if args.non_interactive: + logger.warning("āš ļø Running in non-interactive mode!") + logger.warning( + "āš ļø All orphaned mappings will be deleted automatically!" + ) + response = ( + input("Are you sure you want to continue? (yes/no): ") + .strip() + .lower() + ) + if response != "yes": + logger.info("Cleanup cancelled") + sys.exit(0) + + deleted = diagnostic.cleanup_orphaned_mappings( + sso_alias=args.sso_alias, + interactive=not args.non_interactive, + ) + + if deleted > 0: + logger.info("\nāœ… Cleanup complete!") + logger.info("\nšŸ’” NEXT STEPS:") + logger.info( + "1. In Okta, try unlinking and re-linking the affected group" + ) + logger.info("2. Use 'Link Group' (not 'Create') when re-linking") + logger.info( + "3. The group push should now work without the stale externalId error" + ) + + except KeyboardInterrupt: + logger.info("\n\nOperation cancelled by user") + sys.exit(130) + except Exception as e: + logger.error(f"\nāŒ Unexpected error: {e}", exc_info=True) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tests/unit/test_sso_diagnostic.py b/tests/unit/test_sso_diagnostic.py new file mode 100644 index 000000000..e50cdd62a --- /dev/null +++ b/tests/unit/test_sso_diagnostic.py @@ -0,0 +1,302 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2025 Atlan Pte. Ltd. +from unittest.mock import Mock, patch + +import pytest + +from pyatlan.client.atlan import AtlanClient +from pyatlan.model.group import AtlanGroup, GroupResponse +from pyatlan.model.sso import SSOMapper, SSOMapperConfig +from pyatlan.samples.sso.diagnose_orphaned_group_mappings import ( + SSOGroupMappingDiagnostic, +) + + +@pytest.fixture +def mock_client(): + """Create a mock AtlanClient for testing.""" + return Mock(spec=AtlanClient) + + +@pytest.fixture +def diagnostic(mock_client): + """Create a SSOGroupMappingDiagnostic instance.""" + return SSOGroupMappingDiagnostic(mock_client) + + +@pytest.fixture +def sample_groups(): + """Create sample Atlan groups for testing.""" + return { + "group-id-1": AtlanGroup( + id="group-id-1", + name="test_group_1", + alias="Test Group 1", + ), + "group-id-2": AtlanGroup( + id="group-id-2", + name="test_group_2", + alias="Test Group 2", + ), + } + + +@pytest.fixture +def sample_mappings(): + """Create sample SSO mappings for testing.""" + return [ + # Valid mapping for group-id-1 + SSOMapper( + id="mapping-1", + name="group-id-1--1234567890", + identity_provider_mapper="saml-group-idp-mapper", + identity_provider_alias="okta", + config=SSOMapperConfig( + group_name="test_group_1", + attribute_value="okta-group-1", + sync_mode="FORCE", + attribute_name="memberOf", + ), + ), + # Valid mapping for group-id-2 + SSOMapper( + id="mapping-2", + name="group-id-2--1234567891", + identity_provider_mapper="saml-group-idp-mapper", + identity_provider_alias="okta", + config=SSOMapperConfig( + group_name="test_group_2", + attribute_value="okta-group-2", + sync_mode="FORCE", + attribute_name="memberOf", + ), + ), + # Orphaned mapping - group doesn't exist + SSOMapper( + id="mapping-3", + name="group-id-999--1234567892", + identity_provider_mapper="saml-group-idp-mapper", + identity_provider_alias="okta", + config=SSOMapperConfig( + group_name="deleted_group", + attribute_value="okta-group-3", + sync_mode="FORCE", + attribute_name="memberOf", + ), + ), + # Orphaned mapping - group ID mismatch + SSOMapper( + id="mapping-4", + name="group-id-old--1234567893", + identity_provider_mapper="saml-group-idp-mapper", + identity_provider_alias="okta", + config=SSOMapperConfig( + group_name="test_group_1", # Points to existing group + attribute_value="okta-group-4", + sync_mode="FORCE", + attribute_name="memberOf", + ), + ), + ] + + +def test_get_all_groups(diagnostic, mock_client, sample_groups): + """Test retrieving all Atlan groups.""" + # Setup mock + mock_response = Mock(spec=GroupResponse) + mock_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_response + + # Execute + result = diagnostic.get_all_groups() + + # Verify + assert len(result) == 2 + assert "group-id-1" in result + assert "group-id-2" in result + assert result["group-id-1"].name == "test_group_1" + mock_client.group.get_all.assert_called_once() + + +def test_get_all_sso_mappings(diagnostic, mock_client, sample_mappings): + """Test retrieving all SSO group mappings.""" + # Setup mock + mock_client.sso.get_all_group_mappings.return_value = sample_mappings + + # Execute + result = diagnostic.get_all_sso_mappings("okta") + + # Verify + assert len(result) == 4 + assert result[0].id == "mapping-1" + mock_client.sso.get_all_group_mappings.assert_called_once_with(sso_alias="okta") + + +def test_diagnose_finds_orphaned_mappings( + diagnostic, mock_client, sample_groups, sample_mappings +): + """Test that diagnose correctly identifies orphaned mappings.""" + # Setup mocks + mock_group_response = Mock(spec=GroupResponse) + mock_group_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_group_response + mock_client.sso.get_all_group_mappings.return_value = sample_mappings + + # Execute + results = diagnostic.diagnose_orphaned_mappings("okta") + + # Verify + assert len(results["valid"]) == 2 + assert len(results["orphaned"]) == 2 + + # Check valid mappings + valid_ids = {m.id for m in results["valid"]} + assert "mapping-1" in valid_ids + assert "mapping-2" in valid_ids + + # Check orphaned mappings + orphaned_ids = {m.id for m in results["orphaned"]} + assert "mapping-3" in orphaned_ids # Group doesn't exist + assert "mapping-4" in orphaned_ids # Group ID mismatch + + +def test_diagnose_with_target_group_filter( + diagnostic, mock_client, sample_groups, sample_mappings +): + """Test diagnosing a specific group.""" + # Setup mocks + mock_group_response = Mock(spec=GroupResponse) + mock_group_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_group_response + mock_client.sso.get_all_group_mappings.return_value = sample_mappings + + # Execute - filter for test_group_1 + results = diagnostic.diagnose_orphaned_mappings( + "okta", target_group_name="test_group_1" + ) + + # Verify - should only check mappings for test_group_1 + # mapping-1 is valid, mapping-4 is orphaned (both reference test_group_1) + assert len(results["valid"]) == 1 + assert len(results["orphaned"]) == 1 + assert results["valid"][0].id == "mapping-1" + assert results["orphaned"][0].id == "mapping-4" + + +def test_diagnose_handles_incomplete_mappings(diagnostic, mock_client, sample_groups): + """Test handling of mappings with incomplete data.""" + # Create mapping with missing config + incomplete_mapping = SSOMapper( + id="mapping-incomplete", + name=None, # Missing name + identity_provider_mapper="saml-group-idp-mapper", + identity_provider_alias="okta", + config=None, # Missing config + ) + + # Setup mocks + mock_group_response = Mock(spec=GroupResponse) + mock_group_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_group_response + mock_client.sso.get_all_group_mappings.return_value = [incomplete_mapping] + + # Execute + results = diagnostic.diagnose_orphaned_mappings("okta") + + # Verify - incomplete mapping should be in suspicious list + assert len(results["suspicious"]) == 1 + assert results["suspicious"][0].id == "mapping-incomplete" + + +def test_cleanup_orphaned_mappings_interactive( + diagnostic, mock_client, sample_groups, sample_mappings +): + """Test interactive cleanup of orphaned mappings.""" + # Setup mocks + mock_group_response = Mock(spec=GroupResponse) + mock_group_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_group_response + mock_client.sso.get_all_group_mappings.return_value = sample_mappings + mock_client.sso.delete_group_mapping.return_value = None + + # Mock user input - say yes to first, no to second + with patch("builtins.input", side_effect=["y", "n"]): + # Execute + deleted_count = diagnostic.cleanup_orphaned_mappings("okta", interactive=True) + + # Verify - should delete only the first orphaned mapping + assert deleted_count == 1 + mock_client.sso.delete_group_mapping.assert_called_once() + + +def test_cleanup_orphaned_mappings_non_interactive( + diagnostic, mock_client, sample_groups, sample_mappings +): + """Test non-interactive cleanup of all orphaned mappings.""" + # Setup mocks + mock_group_response = Mock(spec=GroupResponse) + mock_group_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_group_response + mock_client.sso.get_all_group_mappings.return_value = sample_mappings + mock_client.sso.delete_group_mapping.return_value = None + + # Execute + deleted_count = diagnostic.cleanup_orphaned_mappings("okta", interactive=False) + + # Verify - should delete both orphaned mappings + assert deleted_count == 2 + assert mock_client.sso.delete_group_mapping.call_count == 2 + + +def test_cleanup_no_orphaned_mappings(diagnostic, mock_client, sample_groups): + """Test cleanup when there are no orphaned mappings.""" + # Create only valid mappings + valid_mapping = SSOMapper( + id="mapping-1", + name="group-id-1--1234567890", + identity_provider_mapper="saml-group-idp-mapper", + identity_provider_alias="okta", + config=SSOMapperConfig( + group_name="test_group_1", + attribute_value="okta-group-1", + sync_mode="FORCE", + attribute_name="memberOf", + ), + ) + + # Setup mocks + mock_group_response = Mock(spec=GroupResponse) + mock_group_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_group_response + mock_client.sso.get_all_group_mappings.return_value = [valid_mapping] + + # Execute + deleted_count = diagnostic.cleanup_orphaned_mappings("okta") + + # Verify - no deletions should occur + assert deleted_count == 0 + mock_client.sso.delete_group_mapping.assert_not_called() + + +def test_list_all_mappings(diagnostic, mock_client, sample_mappings): + """Test listing all SSO group mappings.""" + # Setup mock + mock_client.sso.get_all_group_mappings.return_value = sample_mappings + + # Execute - should not raise any errors + diagnostic.list_all_mappings("okta") + + # Verify + mock_client.sso.get_all_group_mappings.assert_called_once_with(sso_alias="okta") + + +def test_list_all_mappings_empty(diagnostic, mock_client): + """Test listing when no mappings exist.""" + # Setup mock + mock_client.sso.get_all_group_mappings.return_value = [] + + # Execute - should not raise any errors + diagnostic.list_all_mappings("okta") + + # Verify + mock_client.sso.get_all_group_mappings.assert_called_once_with(sso_alias="okta")