From c62669ca33675b7ad6e36550d8079dfac84b8f70 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 11 Feb 2026 14:22:01 +0000 Subject: [PATCH 1/4] feat: Add diagnostic and cleanup tools for orphaned SSO group mappings (LINTEST-425) This commit resolves the issue where Okta Push Groups fails with stale externalId errors due to orphaned SSO group mappings in Keycloak. Changes: - Add diagnostic script to identify orphaned SSO group mappings - Add cleanup functionality to remove orphaned mappings - Enhance SSO client with better validation and logging - Add new error codes for SSO mapping issues - Create comprehensive documentation for troubleshooting - Add support guide for resolving orphaned mapping issues - Add unit tests for diagnostic utility The diagnostic script can: 1. Identify orphaned mappings (where group ID no longer exists) 2. Clean up orphaned mappings (interactive or automated) 3. List all SSO group mappings for a provider 4. Filter diagnostics by specific group name Usage: python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ --mode diagnose --sso-alias okta Fixes: LINTEST-425 --- docs/SUPPORT_GUIDE_SSO_ORPHANED_MAPPINGS.md | 406 ++++++++++++++++++ docs/sso_troubleshooting.md | 400 +++++++++++++++++ pyatlan/client/aio/sso.py | 41 +- pyatlan/client/sso.py | 41 +- pyatlan/errors.py | 18 + pyatlan/samples/sso/README.md | 89 ++++ pyatlan/samples/sso/__init__.py | 8 + .../sso/diagnose_orphaned_group_mappings.py | 369 ++++++++++++++++ tests/unit/test_sso_diagnostic.py | 302 +++++++++++++ 9 files changed, 1672 insertions(+), 2 deletions(-) create mode 100644 docs/SUPPORT_GUIDE_SSO_ORPHANED_MAPPINGS.md create mode 100644 docs/sso_troubleshooting.md create mode 100644 pyatlan/samples/sso/README.md create mode 100644 pyatlan/samples/sso/__init__.py create mode 100644 pyatlan/samples/sso/diagnose_orphaned_group_mappings.py create mode 100644 tests/unit/test_sso_diagnostic.py diff --git a/docs/SUPPORT_GUIDE_SSO_ORPHANED_MAPPINGS.md b/docs/SUPPORT_GUIDE_SSO_ORPHANED_MAPPINGS.md new file mode 100644 index 000000000..64e87b0d7 --- /dev/null +++ b/docs/SUPPORT_GUIDE_SSO_ORPHANED_MAPPINGS.md @@ -0,0 +1,406 @@ +# Support Guide: Resolving Orphaned SSO Group Mappings + +**Issue Type:** Okta Push Groups / SCIM Group Provisioning Failures +**Error Pattern:** "stale externalId" or "orphaned mapping" +**Severity:** Medium - Blocks specific group from being provisioned +**Related:** LINTEST-425 + +## Quick Reference + +### Symptoms Checklist + +- [ ] Customer reports Okta Push Groups failing for a specific group +- [ ] Error message mentions "externalId" or "unable to get the group" +- [ ] Other groups with similar settings work fine +- [ ] Issue persists after unlinking/re-linking in Okta +- [ ] Issue persists after deleting and recreating the Atlan group + +If 3+ of these apply, this is likely an orphaned SSO mapping issue. + +### Quick Resolution (5 minutes) + +```bash +# 1. Set up environment +export ATLAN_BASE_URL="https://customer-tenant.atlan.com" +export ATLAN_API_KEY="" + +# 2. Run diagnostic +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta \ + --group-name + +# 3. If orphaned mapping found, clean it up +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode cleanup \ + --sso-alias okta + +# 4. Verify cleanup +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta \ + --group-name +``` + +## Detailed Resolution Steps + +### Step 1: Gather Information + +Collect the following from the customer: + +1. **Tenant Details:** + - Tenant URL (e.g., `https://apex.atlan.com`) + - Affected group name (e.g., `grpAtlanProdWorkflowAdmin`) + - SSO provider (Okta, Azure AD, JumpCloud, etc.) + +2. **Error Details:** + - Full error message from the SSO provider + - Screenshot of the error (if available) + - When did the issue start? + +3. **What Has Been Tried:** + - Have they unlinked/re-linked the group? + - Have they deleted and recreated the Atlan group? + - Are there any recent changes to SSO configuration? + +### Step 2: Run Diagnostics + +#### Option A: Using the Python SDK (Recommended) + +```bash +# Install pyatlan if not already available +pip install pyatlan + +# Set environment variables +export ATLAN_BASE_URL="https://customer-tenant.atlan.com" +export ATLAN_API_KEY="" + +# Run diagnostic for the specific group +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta \ + --group-name grpAtlanProdWorkflowAdmin +``` + +**Expected Output:** + +If orphaned mapping exists: +``` +šŸ”“ ORPHANED: mapping-id-123 + Mapping Name: group-id-old--1234567890 + Group Name: grpAtlanProdWorkflowAdmin + SSO Group: grpAtlanProdWorkflowAdmin + Reason: Group ID mismatch: mapping has 'group-id-old' but current group has 'group-id-new' +``` + +If no issues found: +``` +āœ… VALID: mapping-id-456 + Mapping Name: group-id-current--1234567891 + Group Name: grpAtlanProdWorkflowAdmin + SSO Group: grpAtlanProdWorkflowAdmin +``` + +#### Option B: Manual API Check + +```bash +# Get all SSO mappings +curl -X GET \ + "${ATLAN_BASE_URL}/api/service/auth/admin/realms/default/identity-provider/instances/okta/mappers" \ + -H "Authorization: Bearer ${ATLAN_API_KEY}" + +# Get all groups +curl -X GET \ + "${ATLAN_BASE_URL}/api/service/groups" \ + -H "Authorization: Bearer ${ATLAN_API_KEY}" +``` + +### Step 3: Clean Up Orphaned Mappings + +#### Automated Cleanup (Recommended) + +```bash +# Interactive cleanup (asks for confirmation) +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode cleanup \ + --sso-alias okta + +# Non-interactive (use with caution) +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode cleanup \ + --sso-alias okta \ + --non-interactive +``` + +#### Manual Cleanup + +If you prefer to delete specific mappings manually: + +```python +from pyatlan.client.atlan import AtlanClient + +client = AtlanClient() + +# Delete the orphaned mapping +client.sso.delete_group_mapping( + sso_alias="okta", + group_map_id="mapping-id-to-delete" +) + +print("āœ… Deleted orphaned mapping") +``` + +Or via REST API: + +```bash +curl -X POST \ + "${ATLAN_BASE_URL}/api/service/auth/admin/realms/default/identity-provider/instances/okta/mappers/${MAPPING_ID}/delete" \ + -H "Authorization: Bearer ${ATLAN_API_KEY}" +``` + +### Step 4: Verify and Re-link in Okta + +After cleaning up the orphaned mapping: + +1. **Verify the Atlan group exists:** + ```bash + python -c " + from pyatlan.client.atlan import AtlanClient + client = AtlanClient() + groups = client.group.get_by_name(alias='grpAtlanProdWorkflowAdmin') + if groups and groups.records: + print(f'āœ… Group exists: {groups.records[0].id}') + else: + print('āŒ Group not found - needs to be created') + " + ``` + +2. **In Okta:** + - Go to Applications → Atlan → Push Groups + - **Unlink** the affected group (do not delete in Okta) + - **Re-link** using "Link Group" (NOT "Create") + - Select the existing Atlan group from the dropdown + +3. **Test the push:** + - Save the push group configuration + - Trigger a push + - Verify the group members are synced + +### Step 5: Follow-Up + +1. **Run diagnostic again** to confirm no orphaned mappings remain: + ```bash + python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta + ``` + +2. **Check for other affected groups:** + - If this is a widespread issue, run diagnostics without `--group-name` + - Clean up all orphaned mappings + +3. **Document the incident:** + - Note the group name(s) affected + - Record the orphaned mapping ID(s) that were deleted + - Update the customer ticket with resolution details + +## Common Scenarios + +### Scenario 1: Group Deleted and Recreated + +**Situation:** Customer deleted an Atlan group and created a new one with the same name. + +**Problem:** The old SSO mapping still references the old group ID. + +**Solution:** +1. Run diagnostic to identify the orphaned mapping +2. Delete the orphaned mapping +3. In Okta, unlink and re-link the group + +### Scenario 2: SSO Configuration Changed + +**Situation:** Customer changed SSO provider (e.g., from Azure to Okta) or reconfigured SSO. + +**Problem:** Old mappings from the previous SSO configuration still exist. + +**Solution:** +1. Run diagnostic for BOTH SSO providers (old and new) +2. Clean up orphaned mappings from the old provider +3. Verify mappings for the new provider are correct + +### Scenario 3: Multiple Groups Affected + +**Situation:** Multiple groups are experiencing the same issue. + +**Problem:** Likely a systemic issue from a bulk group operation. + +**Solution:** +1. Run diagnostic for all groups: `--mode diagnose --sso-alias okta` +2. Use non-interactive cleanup: `--mode cleanup --sso-alias okta --non-interactive` +3. Have customer re-link all affected groups in Okta + +### Scenario 4: Recurring Issue + +**Situation:** The same group keeps getting orphaned mappings. + +**Problem:** Customer is likely using "Create" instead of "Link Group" in Okta, or there's an automation issue. + +**Solution:** +1. Clean up the orphaned mapping +2. **Educate the customer** on proper linking procedure +3. Check if any automation or scripts are incorrectly managing groups +4. Consider creating a monitoring alert for this specific group + +## Escalation Criteria + +Escalate to engineering if: + +- [ ] Diagnostic script fails to run or errors out +- [ ] Cleanup doesn't resolve the issue (orphaned mapping still present after deletion) +- [ ] Issue recurs immediately after cleanup +- [ ] No orphaned mappings found but the issue persists +- [ ] Backend logs show errors related to Keycloak or SCIM provisioning +- [ ] Multiple customers reporting the same issue + +## Prevention Recommendations + +Share these best practices with customers: + +### For Support Teams + +1. **Regular Audits:** + - Run diagnostic monthly for all SSO providers + - Clean up orphaned mappings proactively + +2. **Documentation:** + - Maintain a runbook for common SSO issues + - Document customer's SSO configuration + +3. **Monitoring:** + - Set up alerts for SSO provisioning failures + - Track orphaned mapping incidents + +### For Customers + +1. **Proper Group Deletion:** + - Before deleting a group, unlink it from SSO first + - Use the diagnostic script before bulk group operations + +2. **Correct Linking Procedure:** + - Always use "Link Group" (not "Create") when re-linking + - Verify the group exists in Atlan before linking + +3. **Testing:** + - Test group pushes in a non-production tenant first + - Verify membership sync after any SSO configuration changes + +## Technical Details + +### What Are Orphaned Mappings? + +An SSO group mapping (Keycloak Identity Provider Mapper) creates a link between: +- An Atlan group (identified by group ID) +- An SSO group (identified by group name in the SSO provider) + +When a group is deleted or recreated, the new group gets a new ID, but the old mapping still references the old ID. This is an "orphaned" mapping. + +### Why Does Okta Fail? + +When Okta tries to push a group: +1. Okta sends a SCIM request with the group's `externalId` +2. Atlan/Keycloak looks up the group by `externalId` +3. If the mapping is orphaned, Keycloak can't find the group +4. The push fails with "unable to get the group" error + +### What Does Cleanup Do? + +The cleanup process: +1. Identifies mappings that reference non-existent groups +2. Deletes these orphaned mappings from Keycloak +3. Allows Okta to create a fresh mapping on the next push + +## API Reference + +### Diagnostic Script + +``` +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings [OPTIONS] + +Options: + --mode {diagnose,cleanup,list} Operation mode (required) + --sso-alias TEXT SSO provider alias (required) + --group-name TEXT Specific group to check (optional) + --non-interactive Skip prompts in cleanup mode +``` + +### SSO Client Methods + +```python +from pyatlan.client.atlan import AtlanClient + +client = AtlanClient() + +# Get all SSO group mappings +mappings = client.sso.get_all_group_mappings(sso_alias="okta") + +# Get specific mapping +mapping = client.sso.get_group_mapping( + sso_alias="okta", + group_map_id="mapping-id" +) + +# Delete mapping +client.sso.delete_group_mapping( + sso_alias="okta", + group_map_id="mapping-id" +) + +# Create new mapping +group = client.group.get_by_name(alias="MyGroup").records[0] +new_mapping = client.sso.create_group_mapping( + sso_alias="okta", + atlan_group=group, + sso_group_name="okta-group-name" +) +``` + +## Troubleshooting the Diagnostic Script + +### Script Won't Run + +**Problem:** `No module named pyatlan` + +**Solution:** +```bash +pip install pyatlan +# or +pip install --upgrade pyatlan +``` + +### Authentication Errors + +**Problem:** `401 Unauthorized` or `403 Forbidden` + +**Solution:** +- Verify `ATLAN_BASE_URL` is correct (include `https://`) +- Verify `ATLAN_API_KEY` has admin permissions +- Check API key hasn't expired + +### No Orphaned Mappings Found + +**Problem:** Diagnostic shows all mappings as valid, but issue persists + +**Possible Causes:** +- Wrong SSO alias (check if it's `okta`, `azure`, `jumpcloud`, etc.) +- Wrong group name (check exact name including capitalization) +- Issue is not related to orphaned mappings (escalate) + +## Related Documentation + +- [SSO Troubleshooting Guide](./sso_troubleshooting.md) +- [SSO Client API](./client/sso.rst) +- [Group Management API](./client/group.rst) + +## Changelog + +- **2025-02-11:** Initial support guide created for LINTEST-425 diff --git a/docs/sso_troubleshooting.md b/docs/sso_troubleshooting.md new file mode 100644 index 000000000..91740913f --- /dev/null +++ b/docs/sso_troubleshooting.md @@ -0,0 +1,400 @@ +# SSO Group Mapping Troubleshooting Guide + +## Overview + +This guide helps you diagnose and resolve issues with SSO (Single Sign-On) group mappings, particularly when using Okta Push Groups or similar SCIM-based group provisioning systems. + +## Common Issue: Stale externalId / Orphaned Mapping + +### Symptoms + +When attempting to push a group from your SSO provider (e.g., Okta) to Atlan, you receive an error similar to: + +``` +Failed on XX-XX-XXXX XX:XX:XX PM UTC: Unable to update Group Push mapping target App group +``: Error while trying to get the group `` with the externalId +`` and id `com.saasure.db.dto.platform.entity.AppGroup@...` +``` + +### Key Indicators + +- āœ… Other groups with similar settings push successfully +- āœ… The error consistently mentions a specific `externalId` +- āœ… The issue persists even after: + - Removing the SSO group mapping in Atlan + - Creating a new Atlan group with a matching name + - Unlinking and re-linking the group from Okta + +### Root Cause + +This issue occurs when there is a **stale mapping** between your SSO provider (Okta) and Atlan. Specifically: + +1. An SSO group mapping was previously created, establishing a link between an Okta group and an Atlan group +2. The Atlan group was deleted or recreated, or the SSO mapping was removed +3. However, Okta still has a record of the old `externalId` for that group +4. When Okta tries to push the group again, it uses the old `externalId` which Atlan cannot resolve + +The SSO group mapping (identity provider mapper in Keycloak) is stored separately from the group itself, and deleting one doesn't automatically clean up the other. + +## Diagnosis + +### Using the Diagnostic Script + +We provide a diagnostic utility to identify orphaned SSO group mappings. This script is part of the Python SDK. + +#### Prerequisites + +```bash +# Install the Atlan Python SDK +pip install pyatlan + +# Set up environment variables +export ATLAN_BASE_URL="https://your-tenant.atlan.com" +export ATLAN_API_KEY="your-api-key" +``` + +#### Diagnose All Mappings + +To check for orphaned mappings across all groups: + +```bash +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta +``` + +#### Diagnose Specific Group + +To check a specific group that's failing: + +```bash +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta \ + --group-name grpAtlanProdWorkflowAdmin +``` + +#### List All SSO Mappings + +To see all current SSO group mappings: + +```bash +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode list \ + --sso-alias okta +``` + +### Manual Diagnosis + +If you prefer to diagnose manually using the Python SDK: + +```python +from pyatlan.client.atlan import AtlanClient + +# Initialize client +client = AtlanClient() + +# Get all SSO group mappings +sso_alias = "okta" # or "azure", "jumpcloud", etc. +mappings = client.sso.get_all_group_mappings(sso_alias=sso_alias) + +# Get all Atlan groups +groups_response = client.group.get_all() +groups = {g.id: g for g in groups_response if g.id} + +# Check each mapping +for mapping in mappings: + if not mapping.name or not mapping.config: + continue + + # Extract group ID from mapping name (format: --) + mapping_group_id = mapping.name.split("--")[0] if "--" in mapping.name else None + + # Check if group still exists + if mapping_group_id and mapping_group_id not in groups: + print(f"āš ļø ORPHANED: Mapping {mapping.id} references non-existent group {mapping_group_id}") + print(f" SSO Group: {mapping.config.attribute_value}") + print(f" Atlan Group: {mapping.config.group_name}") +``` + +## Resolution + +### Option 1: Automated Cleanup (Recommended) + +Use the diagnostic script to automatically clean up orphaned mappings: + +```bash +# Interactive cleanup (prompts for confirmation for each mapping) +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode cleanup \ + --sso-alias okta + +# Non-interactive cleanup (automatically deletes all orphaned mappings) +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode cleanup \ + --sso-alias okta \ + --non-interactive +``` + +**āš ļø Warning:** Non-interactive mode will delete ALL orphaned mappings without confirmation. Use with caution! + +### Option 2: Manual Cleanup + +#### Using the Python SDK + +```python +from pyatlan.client.atlan import AtlanClient + +client = AtlanClient() + +# Find the orphaned mapping ID (from diagnosis step) +orphaned_mapping_id = "your-mapping-id-here" +sso_alias = "okta" + +# Delete the orphaned mapping +client.sso.delete_group_mapping( + sso_alias=sso_alias, + group_map_id=orphaned_mapping_id +) + +print(f"āœ… Deleted orphaned mapping {orphaned_mapping_id}") +``` + +#### Using the REST API + +If you need to clean up mappings directly via API: + +```bash +# Get your API token and base URL +ATLAN_BASE_URL="https://your-tenant.atlan.com" +ATLAN_API_KEY="your-api-key" +SSO_ALIAS="okta" +MAPPING_ID="mapping-id-to-delete" + +# Delete the mapping +curl -X POST \ + "${ATLAN_BASE_URL}/api/service/auth/admin/realms/default/identity-provider/instances/${SSO_ALIAS}/mappers/${MAPPING_ID}/delete" \ + -H "Authorization: Bearer ${ATLAN_API_KEY}" \ + -H "Content-Type: application/json" +``` + +### Post-Cleanup Steps + +After cleaning up the orphaned mappings: + +1. **In Atlan:** + - Verify the Atlan group exists: Settings → Access → Groups + - If needed, create the group: Settings → Access → Groups → + New Group + +2. **In Okta:** + - Go to Applications → Atlan → Push Groups + - **Unlink** the affected group (do not delete in Okta) + - **Re-link** the group using "Link Group" (NOT "Create") + - Choose the existing Atlan group from the dropdown + +3. **Test:** + - Try pushing the group again + - The error should now be resolved + - Verify group membership is correctly synced + +## Prevention + +To prevent this issue in the future: + +### 1. Proper Group Deletion Workflow + +When deleting a group that has SSO mapping: + +```python +from pyatlan.client.atlan import AtlanClient + +client = AtlanClient() +sso_alias = "okta" +group_id = "group-guid-here" + +# Step 1: Find and delete SSO mappings first +all_mappings = client.sso.get_all_group_mappings(sso_alias=sso_alias) +for mapping in all_mappings: + if mapping.name and group_id in mapping.name: + print(f"Deleting SSO mapping: {mapping.id}") + client.sso.delete_group_mapping( + sso_alias=sso_alias, + group_map_id=mapping.id + ) + +# Step 2: Then delete the group +client.group.purge(guid=group_id) +print(f"āœ… Group and SSO mappings deleted successfully") +``` + +### 2. Re-linking Groups + +When re-linking a group after unlinking: + +- āœ… **DO:** Use "Link Group" and select the existing Atlan group +- āŒ **DON'T:** Use "Create" which creates a new group and new mapping +- āŒ **DON'T:** Delete and recreate the Atlan group without cleaning up SSO mappings + +### 3. Regular Audits + +Periodically run the diagnostic script to identify orphaned mappings: + +```bash +# Add to your maintenance routine +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta +``` + +## Advanced Scenarios + +### Multiple SSO Providers + +If you have multiple SSO providers (e.g., Okta and Azure AD), run diagnostics for each: + +```bash +for sso in okta azure jumpcloud; do + echo "Checking $sso..." + python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias $sso +done +``` + +### Scripted Cleanup + +For large-scale cleanups across multiple tenants: + +```python +from pyatlan.client.atlan import AtlanClient +from pyatlan.samples.sso.diagnose_orphaned_group_mappings import SSOGroupMappingDiagnostic + +# Initialize +client = AtlanClient() +diagnostic = SSOGroupMappingDiagnostic(client) + +# Run diagnosis +results = diagnostic.diagnose_orphaned_mappings(sso_alias="okta") + +# Automated cleanup (be careful!) +if results["orphaned"]: + print(f"Found {len(results['orphaned'])} orphaned mappings") + deleted = diagnostic.cleanup_orphaned_mappings( + sso_alias="okta", + interactive=False # Automatic deletion + ) + print(f"Deleted {deleted} mappings") +``` + +### Monitoring and Alerting + +Set up monitoring to detect this issue early: + +```python +import logging +from pyatlan.client.atlan import AtlanClient +from pyatlan.samples.sso.diagnose_orphaned_group_mappings import SSOGroupMappingDiagnostic + +def check_orphaned_mappings(sso_alias: str) -> bool: + """ + Check for orphaned SSO mappings and return True if any found. + Suitable for integration with monitoring systems. + """ + client = AtlanClient() + diagnostic = SSOGroupMappingDiagnostic(client) + + results = diagnostic.diagnose_orphaned_mappings(sso_alias=sso_alias) + + orphaned_count = len(results["orphaned"]) + + if orphaned_count > 0: + logging.warning(f"Found {orphaned_count} orphaned SSO mappings for {sso_alias}") + return True + + return False + +# Usage in monitoring script +if check_orphaned_mappings("okta"): + # Send alert to your monitoring system + # e.g., send_pagerduty_alert(), send_slack_message(), etc. + pass +``` + +## API Reference + +### SSO Client Methods + +All SSO operations are available through `client.sso`: + +```python +from pyatlan.client.atlan import AtlanClient +from pyatlan.model.group import AtlanGroup + +client = AtlanClient() + +# Create a new SSO group mapping +group = client.group.get_by_name(alias="MyGroup").records[0] +mapping = client.sso.create_group_mapping( + sso_alias="okta", + atlan_group=group, + sso_group_name="okta-group-name" +) + +# Get all SSO group mappings +all_mappings = client.sso.get_all_group_mappings(sso_alias="okta") + +# Get specific SSO group mapping +mapping = client.sso.get_group_mapping( + sso_alias="okta", + group_map_id="mapping-id" +) + +# Update SSO group mapping +updated = client.sso.update_group_mapping( + sso_alias="okta", + atlan_group=group, + group_map_id="mapping-id", + sso_group_name="new-okta-group-name" +) + +# Delete SSO group mapping +client.sso.delete_group_mapping( + sso_alias="okta", + group_map_id="mapping-id" +) +``` + +## Support Escalation + +If the above steps don't resolve your issue, escalate to Atlan Support with: + +1. **Environment Details:** + - Tenant URL + - SSO provider (Okta, Azure AD, etc.) + - Affected group name(s) + +2. **Diagnostic Output:** + ```bash + python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta > diagnostic_output.txt + ``` + +3. **Error Messages:** + - Full error message from Okta/SSO provider + - Any relevant logs from Atlan + +4. **Steps Already Taken:** + - List what you've already tried + - Whether cleanup script was run + - Results of cleanup attempts + +## Related Documentation + +- [SSO Group Mappings API](../client/sso.rst) +- [Group Management API](../client/group.rst) +- [Atlan SSO Configuration](https://ask.atlan.com/hc/en-us/articles/sso-setup) + +## Changelog + +- **2025-02-11:** Initial documentation for LINTEST-425 (Okta Push Groups stale externalId issue) diff --git a/pyatlan/client/aio/sso.py b/pyatlan/client/aio/sso.py index d68f4e04f..23a168b4f 100644 --- a/pyatlan/client/aio/sso.py +++ b/pyatlan/client/aio/sso.py @@ -3,6 +3,7 @@ from __future__ import annotations +import logging from typing import TYPE_CHECKING, List from pydantic.v1 import validate_arguments @@ -23,6 +24,8 @@ if TYPE_CHECKING: pass +logger = logging.getLogger(__name__) + class AsyncSSOClient: """ @@ -63,14 +66,35 @@ async def create_group_mapping( :param atlan_group: existing Atlan group. :param sso_group_name: name of the SSO group. :raises AtlanError: on any error during API invocation. + :raises InvalidRequestError: if the group doesn't have required fields or mapping already exists. :returns: created SSO group mapping instance. """ + # Validate the group has required fields + if not atlan_group.id: + raise ErrorCode.SSO_MAPPING_VALIDATION_ERROR.exception_with_parameters( + "Atlan group must have an 'id' field populated", + sso_alias, + ) + if not atlan_group.name: + raise ErrorCode.SSO_MAPPING_VALIDATION_ERROR.exception_with_parameters( + "Atlan group must have a 'name' field populated", + sso_alias, + ) + + logger.info( + f"Creating SSO group mapping: {atlan_group.alias} (ID: {atlan_group.id}) " + f"<-> {sso_group_name} (SSO: {sso_alias})" + ) + await self._check_existing_group_mappings(sso_alias, atlan_group) endpoint, request_obj = SSOCreateGroupMapping.prepare_request( sso_alias, atlan_group, sso_group_name ) raw_json = await self._client._call_api(endpoint, request_obj=request_obj) - return SSOCreateGroupMapping.process_response(raw_json) + result = SSOCreateGroupMapping.process_response(raw_json) + + logger.info(f"Successfully created SSO group mapping with ID: {result.id}") + return result @validate_arguments async def update_group_mapping( @@ -129,14 +153,29 @@ async def get_group_mapping(self, sso_alias: str, group_map_id: str) -> SSOMappe async def delete_group_mapping(self, sso_alias: str, group_map_id: str) -> None: """ Deletes an existing Atlan SSO group mapping. + + Note: This only deletes the SSO mapping (identity provider mapper). + If you're experiencing issues with Okta Push Groups failing with + stale externalId errors, you may need to run the diagnostic script + to identify and clean up orphaned mappings. :param sso_alias: name of the SSO provider. :param group_map_id: existing SSO group map identifier. :raises AtlanError: on any error during API invocation. :returns: an empty response (`None`). """ + logger.info(f"Deleting SSO group mapping: {group_map_id} (SSO: {sso_alias})") + endpoint, request_obj = SSODeleteGroupMapping.prepare_request( sso_alias, group_map_id ) raw_json = await self._client._call_api(endpoint, request_obj=request_obj) + + logger.info(f"Successfully deleted SSO group mapping: {group_map_id}") + logger.debug( + "If you're experiencing Okta Push Groups issues with stale externalId, " + "run: python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings " + f"--mode diagnose --sso-alias {sso_alias}" + ) + return raw_json diff --git a/pyatlan/client/sso.py b/pyatlan/client/sso.py index 5c96f23ba..64fd449c2 100644 --- a/pyatlan/client/sso.py +++ b/pyatlan/client/sso.py @@ -1,3 +1,4 @@ +import logging from typing import List from pydantic.v1 import validate_arguments @@ -15,6 +16,8 @@ from pyatlan.model.group import AtlanGroup from pyatlan.model.sso import SSOMapper +logger = logging.getLogger(__name__) + class SSOClient: """ @@ -55,14 +58,35 @@ def create_group_mapping( :param atlan_group: existing Atlan group. :param sso_group_name: name of the SSO group. :raises AtlanError: on any error during API invocation. + :raises InvalidRequestError: if the group doesn't have required fields or mapping already exists. :returns: created SSO group mapping instance. """ + # Validate the group has required fields + if not atlan_group.id: + raise ErrorCode.SSO_MAPPING_VALIDATION_ERROR.exception_with_parameters( + "Atlan group must have an 'id' field populated", + sso_alias, + ) + if not atlan_group.name: + raise ErrorCode.SSO_MAPPING_VALIDATION_ERROR.exception_with_parameters( + "Atlan group must have a 'name' field populated", + sso_alias, + ) + + logger.info( + f"Creating SSO group mapping: {atlan_group.alias} (ID: {atlan_group.id}) " + f"<-> {sso_group_name} (SSO: {sso_alias})" + ) + self._check_existing_group_mappings(sso_alias, atlan_group) endpoint, request_obj = SSOCreateGroupMapping.prepare_request( sso_alias, atlan_group, sso_group_name ) raw_json = self._client._call_api(endpoint, request_obj=request_obj) - return SSOCreateGroupMapping.process_response(raw_json) + result = SSOCreateGroupMapping.process_response(raw_json) + + logger.info(f"Successfully created SSO group mapping with ID: {result.id}") + return result @validate_arguments def update_group_mapping( @@ -121,14 +145,29 @@ def get_group_mapping(self, sso_alias: str, group_map_id: str) -> SSOMapper: def delete_group_mapping(self, sso_alias: str, group_map_id: str) -> None: """ Deletes an existing Atlan SSO group mapping. + + Note: This only deletes the SSO mapping (identity provider mapper). + If you're experiencing issues with Okta Push Groups failing with + stale externalId errors, you may need to run the diagnostic script + to identify and clean up orphaned mappings. :param sso_alias: name of the SSO provider. :param group_map_id: existing SSO group map identifier. :raises AtlanError: on any error during API invocation. :returns: an empty response (`None`). """ + logger.info(f"Deleting SSO group mapping: {group_map_id} (SSO: {sso_alias})") + endpoint, request_obj = SSODeleteGroupMapping.prepare_request( sso_alias, group_map_id ) raw_json = self._client._call_api(endpoint, request_obj=request_obj) + + logger.info(f"Successfully deleted SSO group mapping: {group_map_id}") + logger.debug( + "If you're experiencing Okta Push Groups issues with stale externalId, " + "run: python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings " + f"--mode diagnose --sso-alias {sso_alias}" + ) + return raw_json diff --git a/pyatlan/errors.py b/pyatlan/errors.py index 9fd801a30..bd3bd7544 100644 --- a/pyatlan/errors.py +++ b/pyatlan/errors.py @@ -539,6 +539,24 @@ class ErrorCode(Enum): "You can use SSOClient.update_group_mapping() to update the existing group mapping.", InvalidRequestError, ) + SSO_GROUP_NOT_FOUND = ( + 404, + "ATLAN-PYTHON-404-031", + "Atlan group '{0}' (ID: {1}) not found. This may indicate an orphaned SSO group mapping.", + "The SSO mapping references a group that no longer exists. " + "Run the diagnostic script to identify and clean up orphaned mappings: " + "python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings --mode diagnose --sso-alias {2}", + NotFoundError, + ) + SSO_MAPPING_VALIDATION_ERROR = ( + 400, + "ATLAN-PYTHON-400-059", + "SSO mapping validation failed: {0}", + "Ensure the Atlan group exists before creating an SSO mapping. " + "You may have orphaned mappings that need cleanup. " + "Run: python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings --mode diagnose --sso-alias {1}", + InvalidRequestError, + ) INVALID_UPLOAD_FILE_PATH = ( 400, "ATLAN-PYTHON-400-059", diff --git a/pyatlan/samples/sso/README.md b/pyatlan/samples/sso/README.md new file mode 100644 index 000000000..b4e782b39 --- /dev/null +++ b/pyatlan/samples/sso/README.md @@ -0,0 +1,89 @@ +# SSO Group Mapping Utilities + +This directory contains utilities for managing SSO (Single Sign-On) group mappings in Atlan. + +## Available Tools + +### Diagnose Orphaned Group Mappings + +**File:** `diagnose_orphaned_group_mappings.py` + +A diagnostic and cleanup utility for identifying and resolving orphaned SSO group mappings that can cause Okta Push Groups to fail with stale `externalId` errors. + +#### Quick Start + +```bash +# Set up environment +export ATLAN_BASE_URL="https://your-tenant.atlan.com" +export ATLAN_API_KEY="your-api-key" + +# Diagnose all SSO group mappings +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta + +# Diagnose a specific group +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode diagnose \ + --sso-alias okta \ + --group-name grpAtlanProdWorkflowAdmin + +# Clean up orphaned mappings (interactive) +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode cleanup \ + --sso-alias okta + +# List all SSO group mappings +python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings \ + --mode list \ + --sso-alias okta +``` + +#### Common Use Cases + +1. **Okta Push Groups Failing with Stale externalId** + - Run diagnostic to identify orphaned mappings + - Clean up orphaned mappings + - In Okta, unlink and re-link the group (use "Link Group", not "Create") + +2. **Regular Maintenance** + - Periodically run diagnostic to catch orphaned mappings early + - Schedule as part of your regular Atlan maintenance routine + +3. **Group Migration or Cleanup** + - Before deleting groups, check for associated SSO mappings + - Clean up mappings before deleting the group to avoid orphans + +#### Options + +- `--mode`: Operation mode + - `diagnose`: Check for orphaned mappings (recommended first step) + - `cleanup`: Remove orphaned mappings (interactive by default) + - `list`: Display all SSO group mappings + +- `--sso-alias`: SSO provider alias (e.g., `okta`, `azure`, `jumpcloud`) + +- `--group-name`: (Optional) Check only a specific group + +- `--non-interactive`: Run cleanup without prompting (use with caution!) + +## Documentation + +For detailed information about SSO group mapping troubleshooting, see: +- [SSO Troubleshooting Guide](../../../docs/sso_troubleshooting.md) +- [SSO Client API Documentation](../../../docs/client/sso.rst) + +## Support + +If you encounter issues or need help: + +1. Run the diagnostic script and save the output +2. Check the [SSO Troubleshooting Guide](../../../docs/sso_troubleshooting.md) +3. If the issue persists, contact Atlan Support with: + - Diagnostic output + - Full error messages + - Steps you've already tried + +## Related Issues + +- **LINTEST-425**: Okta Push Groups stale externalId / orphaned mapping issue diff --git a/pyatlan/samples/sso/__init__.py b/pyatlan/samples/sso/__init__.py new file mode 100644 index 000000000..746515783 --- /dev/null +++ b/pyatlan/samples/sso/__init__.py @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2025 Atlan Pte. Ltd. +""" +SSO (Single Sign-On) utilities for Atlan. + +This module provides utilities for managing and troubleshooting +SSO group mappings in Atlan. +""" diff --git a/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py b/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py new file mode 100644 index 000000000..b5bd03604 --- /dev/null +++ b/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py @@ -0,0 +1,369 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2025 Atlan Pte. Ltd. +""" +Diagnostic and cleanup utility for orphaned SSO group mappings. + +This script helps diagnose and resolve issues where Okta Push Groups fails +with stale externalId errors. This typically happens when: +1. An SSO group mapping is deleted but Okta still references the old externalId +2. A group is recreated with the same name but Okta pushes with old externalId +3. Unlinking and re-linking groups doesn't clear the orphaned mapping + +Usage: + # Diagnose orphaned mappings + python diagnose_orphaned_group_mappings.py --mode diagnose --sso-alias + + # Diagnose specific group + python diagnose_orphaned_group_mappings.py --mode diagnose --sso-alias --group-name + + # Clean up orphaned mappings (interactive) + python diagnose_orphaned_group_mappings.py --mode cleanup --sso-alias + + # List all SSO group mappings + python diagnose_orphaned_group_mappings.py --mode list --sso-alias + +Environment Variables: + ATLAN_BASE_URL: Base URL of your Atlan tenant (e.g., https://apex.atlan.com) + ATLAN_API_KEY: API key for authentication +""" + +import argparse +import logging +import sys +from typing import Dict, List, Optional, Set + +from pyatlan.client.atlan import AtlanClient +from pyatlan.errors import AtlanError +from pyatlan.model.group import AtlanGroup +from pyatlan.model.sso import SSOMapper + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + + +class SSOGroupMappingDiagnostic: + """Utility to diagnose and clean up orphaned SSO group mappings.""" + + def __init__(self, client: AtlanClient): + self.client = client + + def get_all_groups(self) -> Dict[str, AtlanGroup]: + """ + Retrieve all Atlan groups. + + :returns: Dictionary mapping group ID to AtlanGroup + """ + logger.info("Retrieving all Atlan groups...") + groups = {} + + try: + response = self.client.group.get_all(limit=100) + for group in response: + if group.id: + groups[group.id] = group + logger.info(f"Found {len(groups)} Atlan groups") + except AtlanError as e: + logger.error(f"Error retrieving groups: {e}") + raise + + return groups + + def get_all_sso_mappings(self, sso_alias: str) -> List[SSOMapper]: + """ + Retrieve all SSO group mappings for a given SSO provider. + + :param sso_alias: SSO provider alias (e.g., 'okta', 'azure', 'jumpcloud') + :returns: List of SSO group mappings + """ + logger.info(f"Retrieving SSO group mappings for '{sso_alias}'...") + + try: + mappings = self.client.sso.get_all_group_mappings(sso_alias=sso_alias) + logger.info(f"Found {len(mappings)} SSO group mappings") + return mappings + except AtlanError as e: + logger.error(f"Error retrieving SSO mappings: {e}") + raise + + def diagnose_orphaned_mappings( + self, sso_alias: str, target_group_name: Optional[str] = None + ) -> Dict[str, List[SSOMapper]]: + """ + Diagnose orphaned SSO group mappings. + + An orphaned mapping is one where: + - The mapping references a group ID that doesn't exist + - The mapping's group name doesn't match the actual group name + + :param sso_alias: SSO provider alias + :param target_group_name: Optional specific group name to check + :returns: Dictionary with 'orphaned' and 'valid' mapping lists + """ + logger.info("=" * 80) + logger.info("DIAGNOSING SSO GROUP MAPPINGS") + logger.info("=" * 80) + + groups = self.get_all_groups() + mappings = self.get_all_sso_mappings(sso_alias) + + # Create reverse lookup: group name -> group ID + group_name_to_id = {g.name: g.id for g in groups.values() if g.name} + + orphaned = [] + valid = [] + suspicious = [] + + logger.info("\nAnalyzing mappings...") + logger.info("-" * 80) + + for mapping in mappings: + if not mapping.name or not mapping.config or not mapping.config.group_name: + logger.warning(f"āš ļø Mapping {mapping.id} has incomplete data") + suspicious.append(mapping) + continue + + # Extract group ID from mapping name (format: --) + mapping_group_id = mapping.name.split("--")[0] if "--" in mapping.name else None + mapped_group_name = mapping.config.group_name + + # Skip if target group specified and this isn't it + if target_group_name and mapped_group_name != target_group_name: + continue + + # Check if this is orphaned + is_orphaned = False + orphan_reason = [] + + # Case 1: Group ID in mapping doesn't exist + if mapping_group_id and mapping_group_id not in groups: + is_orphaned = True + orphan_reason.append(f"Group ID '{mapping_group_id}' not found") + + # Case 2: Group name doesn't match any existing group + if mapped_group_name not in group_name_to_id: + is_orphaned = True + orphan_reason.append(f"Group name '{mapped_group_name}' not found") + + # Case 3: Group ID in mapping doesn't match current group with that name + elif mapping_group_id and group_name_to_id.get(mapped_group_name) != mapping_group_id: + is_orphaned = True + current_id = group_name_to_id.get(mapped_group_name) + orphan_reason.append( + f"Group ID mismatch: mapping has '{mapping_group_id}' " + f"but current group has '{current_id}'" + ) + + if is_orphaned: + orphaned.append(mapping) + logger.warning(f"šŸ”“ ORPHANED: {mapping.id}") + logger.warning(f" Mapping Name: {mapping.name}") + logger.warning(f" Group Name: {mapped_group_name}") + logger.warning(f" SSO Group: {mapping.config.attribute_value}") + logger.warning(f" Reason: {', '.join(orphan_reason)}") + logger.warning("") + else: + valid.append(mapping) + if target_group_name: + logger.info(f"āœ… VALID: {mapping.id}") + logger.info(f" Mapping Name: {mapping.name}") + logger.info(f" Group Name: {mapped_group_name}") + logger.info(f" SSO Group: {mapping.config.attribute_value}") + logger.info("") + + logger.info("=" * 80) + logger.info("SUMMARY") + logger.info("=" * 80) + logger.info(f"Total mappings: {len(mappings)}") + logger.info(f"āœ… Valid mappings: {len(valid)}") + logger.info(f"šŸ”“ Orphaned mappings: {len(orphaned)}") + logger.info(f"āš ļø Suspicious mappings: {len(suspicious)}") + logger.info("=" * 80) + + return { + "orphaned": orphaned, + "valid": valid, + "suspicious": suspicious, + } + + def list_all_mappings(self, sso_alias: str) -> None: + """ + List all SSO group mappings in a readable format. + + :param sso_alias: SSO provider alias + """ + logger.info("=" * 80) + logger.info(f"SSO GROUP MAPPINGS FOR '{sso_alias}'") + logger.info("=" * 80) + + mappings = self.get_all_sso_mappings(sso_alias) + + if not mappings: + logger.info("No SSO group mappings found") + return + + for i, mapping in enumerate(mappings, 1): + logger.info(f"\n{i}. Mapping ID: {mapping.id}") + logger.info(f" Mapping Name: {mapping.name}") + if mapping.config: + logger.info(f" Atlan Group: {mapping.config.group_name}") + logger.info(f" SSO Group: {mapping.config.attribute_value}") + logger.info(f" Sync Mode: {mapping.config.sync_mode}") + logger.info(f" Provider: {mapping.identity_provider_alias}") + logger.info(f" Mapper Type: {mapping.identity_provider_mapper}") + + logger.info("\n" + "=" * 80) + + def cleanup_orphaned_mappings( + self, sso_alias: str, interactive: bool = True + ) -> int: + """ + Clean up orphaned SSO group mappings. + + :param sso_alias: SSO provider alias + :param interactive: If True, ask for confirmation before each deletion + :returns: Number of mappings deleted + """ + logger.info("=" * 80) + logger.info("CLEANUP MODE") + logger.info("=" * 80) + + results = self.diagnose_orphaned_mappings(sso_alias) + orphaned = results["orphaned"] + + if not orphaned: + logger.info("\nāœ… No orphaned mappings found. Nothing to clean up!") + return 0 + + logger.warning(f"\nāš ļø Found {len(orphaned)} orphaned mapping(s) to clean up") + + deleted_count = 0 + + for mapping in orphaned: + if interactive: + logger.info("\n" + "-" * 80) + logger.info(f"Mapping ID: {mapping.id}") + logger.info(f"Mapping Name: {mapping.name}") + if mapping.config: + logger.info(f"Group Name: {mapping.config.group_name}") + logger.info(f"SSO Group: {mapping.config.attribute_value}") + + response = input("\nDelete this mapping? (y/n): ").strip().lower() + + if response != 'y': + logger.info("Skipped") + continue + + try: + logger.info(f"Deleting mapping {mapping.id}...") + self.client.sso.delete_group_mapping( + sso_alias=sso_alias, + group_map_id=mapping.id, + ) + logger.info(f"āœ… Successfully deleted mapping {mapping.id}") + deleted_count += 1 + except AtlanError as e: + logger.error(f"āŒ Error deleting mapping {mapping.id}: {e}") + + logger.info("\n" + "=" * 80) + logger.info(f"Cleanup complete. Deleted {deleted_count} orphaned mapping(s)") + logger.info("=" * 80) + + return deleted_count + + +def main(): + """Main entry point for the diagnostic script.""" + parser = argparse.ArgumentParser( + description="Diagnose and clean up orphaned SSO group mappings", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + + parser.add_argument( + "--mode", + required=True, + choices=["diagnose", "cleanup", "list"], + help="Operation mode: diagnose (check for issues), cleanup (remove orphaned mappings), or list (show all mappings)", + ) + + parser.add_argument( + "--sso-alias", + required=True, + help="SSO provider alias (e.g., 'okta', 'azure', 'jumpcloud')", + ) + + parser.add_argument( + "--group-name", + help="Optional: Specific Atlan group name to diagnose", + ) + + parser.add_argument( + "--non-interactive", + action="store_true", + help="Run cleanup without prompting for confirmation (dangerous!)", + ) + + args = parser.parse_args() + + # Initialize client + try: + client = AtlanClient() + logger.info(f"Connected to Atlan at {client.base_url}") + except Exception as e: + logger.error(f"Failed to initialize Atlan client: {e}") + logger.error("Make sure ATLAN_BASE_URL and ATLAN_API_KEY are set") + sys.exit(1) + + # Initialize diagnostic tool + diagnostic = SSOGroupMappingDiagnostic(client) + + # Execute requested mode + try: + if args.mode == "list": + diagnostic.list_all_mappings(args.sso_alias) + + elif args.mode == "diagnose": + results = diagnostic.diagnose_orphaned_mappings( + sso_alias=args.sso_alias, + target_group_name=args.group_name, + ) + + if results["orphaned"]: + logger.info("\nšŸ’” TIP: Run with --mode cleanup to remove orphaned mappings") + sys.exit(1) # Exit with error code if orphaned mappings found + + elif args.mode == "cleanup": + if args.non_interactive: + logger.warning("āš ļø Running in non-interactive mode!") + logger.warning("āš ļø All orphaned mappings will be deleted automatically!") + response = input("Are you sure you want to continue? (yes/no): ").strip().lower() + if response != "yes": + logger.info("Cleanup cancelled") + sys.exit(0) + + deleted = diagnostic.cleanup_orphaned_mappings( + sso_alias=args.sso_alias, + interactive=not args.non_interactive, + ) + + if deleted > 0: + logger.info("\nāœ… Cleanup complete!") + logger.info("\nšŸ’” NEXT STEPS:") + logger.info("1. In Okta, try unlinking and re-linking the affected group") + logger.info("2. Use 'Link Group' (not 'Create') when re-linking") + logger.info("3. The group push should now work without the stale externalId error") + + except KeyboardInterrupt: + logger.info("\n\nOperation cancelled by user") + sys.exit(130) + except Exception as e: + logger.error(f"\nāŒ Unexpected error: {e}", exc_info=True) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tests/unit/test_sso_diagnostic.py b/tests/unit/test_sso_diagnostic.py new file mode 100644 index 000000000..9b4295668 --- /dev/null +++ b/tests/unit/test_sso_diagnostic.py @@ -0,0 +1,302 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2025 Atlan Pte. Ltd. +from unittest.mock import Mock, patch + +import pytest + +from pyatlan.client.atlan import AtlanClient +from pyatlan.model.group import AtlanGroup, GroupResponse +from pyatlan.model.sso import SSOMapper, SSOMapperConfig +from pyatlan.samples.sso.diagnose_orphaned_group_mappings import ( + SSOGroupMappingDiagnostic, +) + + +@pytest.fixture +def mock_client(): + """Create a mock AtlanClient for testing.""" + return Mock(spec=AtlanClient) + + +@pytest.fixture +def diagnostic(mock_client): + """Create a SSOGroupMappingDiagnostic instance.""" + return SSOGroupMappingDiagnostic(mock_client) + + +@pytest.fixture +def sample_groups(): + """Create sample Atlan groups for testing.""" + return { + "group-id-1": AtlanGroup( + id="group-id-1", + name="test_group_1", + alias="Test Group 1", + ), + "group-id-2": AtlanGroup( + id="group-id-2", + name="test_group_2", + alias="Test Group 2", + ), + } + + +@pytest.fixture +def sample_mappings(): + """Create sample SSO mappings for testing.""" + return [ + # Valid mapping for group-id-1 + SSOMapper( + id="mapping-1", + name="group-id-1--1234567890", + identity_provider_mapper="saml-group-idp-mapper", + identity_provider_alias="okta", + config=SSOMapperConfig( + group_name="test_group_1", + attribute_value="okta-group-1", + sync_mode="FORCE", + attribute_name="memberOf", + ), + ), + # Valid mapping for group-id-2 + SSOMapper( + id="mapping-2", + name="group-id-2--1234567891", + identity_provider_mapper="saml-group-idp-mapper", + identity_provider_alias="okta", + config=SSOMapperConfig( + group_name="test_group_2", + attribute_value="okta-group-2", + sync_mode="FORCE", + attribute_name="memberOf", + ), + ), + # Orphaned mapping - group doesn't exist + SSOMapper( + id="mapping-3", + name="group-id-999--1234567892", + identity_provider_mapper="saml-group-idp-mapper", + identity_provider_alias="okta", + config=SSOMapperConfig( + group_name="deleted_group", + attribute_value="okta-group-3", + sync_mode="FORCE", + attribute_name="memberOf", + ), + ), + # Orphaned mapping - group ID mismatch + SSOMapper( + id="mapping-4", + name="group-id-old--1234567893", + identity_provider_mapper="saml-group-idp-mapper", + identity_provider_alias="okta", + config=SSOMapperConfig( + group_name="test_group_1", # Points to existing group + attribute_value="okta-group-4", + sync_mode="FORCE", + attribute_name="memberOf", + ), + ), + ] + + +def test_get_all_groups(diagnostic, mock_client, sample_groups): + """Test retrieving all Atlan groups.""" + # Setup mock + mock_response = Mock(spec=GroupResponse) + mock_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_response + + # Execute + result = diagnostic.get_all_groups() + + # Verify + assert len(result) == 2 + assert "group-id-1" in result + assert "group-id-2" in result + assert result["group-id-1"].name == "test_group_1" + mock_client.group.get_all.assert_called_once() + + +def test_get_all_sso_mappings(diagnostic, mock_client, sample_mappings): + """Test retrieving all SSO group mappings.""" + # Setup mock + mock_client.sso.get_all_group_mappings.return_value = sample_mappings + + # Execute + result = diagnostic.get_all_sso_mappings("okta") + + # Verify + assert len(result) == 4 + assert result[0].id == "mapping-1" + mock_client.sso.get_all_group_mappings.assert_called_once_with(sso_alias="okta") + + +def test_diagnose_finds_orphaned_mappings( + diagnostic, mock_client, sample_groups, sample_mappings +): + """Test that diagnose correctly identifies orphaned mappings.""" + # Setup mocks + mock_group_response = Mock(spec=GroupResponse) + mock_group_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_group_response + mock_client.sso.get_all_group_mappings.return_value = sample_mappings + + # Execute + results = diagnostic.diagnose_orphaned_mappings("okta") + + # Verify + assert len(results["valid"]) == 2 + assert len(results["orphaned"]) == 2 + + # Check valid mappings + valid_ids = {m.id for m in results["valid"]} + assert "mapping-1" in valid_ids + assert "mapping-2" in valid_ids + + # Check orphaned mappings + orphaned_ids = {m.id for m in results["orphaned"]} + assert "mapping-3" in orphaned_ids # Group doesn't exist + assert "mapping-4" in orphaned_ids # Group ID mismatch + + +def test_diagnose_with_target_group_filter( + diagnostic, mock_client, sample_groups, sample_mappings +): + """Test diagnosing a specific group.""" + # Setup mocks + mock_group_response = Mock(spec=GroupResponse) + mock_group_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_group_response + mock_client.sso.get_all_group_mappings.return_value = sample_mappings + + # Execute - filter for test_group_1 + results = diagnostic.diagnose_orphaned_mappings("okta", target_group_name="test_group_1") + + # Verify - should only check mappings for test_group_1 + # mapping-1 is valid, mapping-4 is orphaned (both reference test_group_1) + assert len(results["valid"]) == 1 + assert len(results["orphaned"]) == 1 + assert results["valid"][0].id == "mapping-1" + assert results["orphaned"][0].id == "mapping-4" + + +def test_diagnose_handles_incomplete_mappings(diagnostic, mock_client, sample_groups): + """Test handling of mappings with incomplete data.""" + # Create mapping with missing config + incomplete_mapping = SSOMapper( + id="mapping-incomplete", + name=None, # Missing name + identity_provider_mapper="saml-group-idp-mapper", + identity_provider_alias="okta", + config=None, # Missing config + ) + + # Setup mocks + mock_group_response = Mock(spec=GroupResponse) + mock_group_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_group_response + mock_client.sso.get_all_group_mappings.return_value = [incomplete_mapping] + + # Execute + results = diagnostic.diagnose_orphaned_mappings("okta") + + # Verify - incomplete mapping should be in suspicious list + assert len(results["suspicious"]) == 1 + assert results["suspicious"][0].id == "mapping-incomplete" + + +def test_cleanup_orphaned_mappings_interactive( + diagnostic, mock_client, sample_groups, sample_mappings +): + """Test interactive cleanup of orphaned mappings.""" + # Setup mocks + mock_group_response = Mock(spec=GroupResponse) + mock_group_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_group_response + mock_client.sso.get_all_group_mappings.return_value = sample_mappings + mock_client.sso.delete_group_mapping.return_value = None + + # Mock user input - say yes to first, no to second + with patch("builtins.input", side_effect=["y", "n"]): + # Execute + deleted_count = diagnostic.cleanup_orphaned_mappings("okta", interactive=True) + + # Verify - should delete only the first orphaned mapping + assert deleted_count == 1 + mock_client.sso.delete_group_mapping.assert_called_once() + + +def test_cleanup_orphaned_mappings_non_interactive( + diagnostic, mock_client, sample_groups, sample_mappings +): + """Test non-interactive cleanup of all orphaned mappings.""" + # Setup mocks + mock_group_response = Mock(spec=GroupResponse) + mock_group_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_group_response + mock_client.sso.get_all_group_mappings.return_value = sample_mappings + mock_client.sso.delete_group_mapping.return_value = None + + # Execute + deleted_count = diagnostic.cleanup_orphaned_mappings("okta", interactive=False) + + # Verify - should delete both orphaned mappings + assert deleted_count == 2 + assert mock_client.sso.delete_group_mapping.call_count == 2 + + +def test_cleanup_no_orphaned_mappings( + diagnostic, mock_client, sample_groups +): + """Test cleanup when there are no orphaned mappings.""" + # Create only valid mappings + valid_mapping = SSOMapper( + id="mapping-1", + name="group-id-1--1234567890", + identity_provider_mapper="saml-group-idp-mapper", + identity_provider_alias="okta", + config=SSOMapperConfig( + group_name="test_group_1", + attribute_value="okta-group-1", + sync_mode="FORCE", + attribute_name="memberOf", + ), + ) + + # Setup mocks + mock_group_response = Mock(spec=GroupResponse) + mock_group_response.__iter__ = Mock(return_value=iter(sample_groups.values())) + mock_client.group.get_all.return_value = mock_group_response + mock_client.sso.get_all_group_mappings.return_value = [valid_mapping] + + # Execute + deleted_count = diagnostic.cleanup_orphaned_mappings("okta") + + # Verify - no deletions should occur + assert deleted_count == 0 + mock_client.sso.delete_group_mapping.assert_not_called() + + +def test_list_all_mappings(diagnostic, mock_client, sample_mappings): + """Test listing all SSO group mappings.""" + # Setup mock + mock_client.sso.get_all_group_mappings.return_value = sample_mappings + + # Execute - should not raise any errors + diagnostic.list_all_mappings("okta") + + # Verify + mock_client.sso.get_all_group_mappings.assert_called_once_with(sso_alias="okta") + + +def test_list_all_mappings_empty(diagnostic, mock_client): + """Test listing when no mappings exist.""" + # Setup mock + mock_client.sso.get_all_group_mappings.return_value = [] + + # Execute - should not raise any errors + diagnostic.list_all_mappings("okta") + + # Verify + mock_client.sso.get_all_group_mappings.assert_called_once_with(sso_alias="okta") From 37ee2196a889fbdf7e6ac3bc92d9892c7ccb40a6 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 11 Feb 2026 14:29:10 +0000 Subject: [PATCH 2/4] fix: Code formatting and linting issues for SSO diagnostic tool - Remove unused Set import from diagnose_orphaned_group_mappings.py - Fix trailing whitespace in all modified files - Add None check for mapping.id before deletion to satisfy mypy - Format code to pass ruff format and ruff check --- pyatlan/client/aio/sso.py | 14 +- pyatlan/client/sso.py | 14 +- .../sso/diagnose_orphaned_group_mappings.py | 141 ++++++++++-------- 3 files changed, 95 insertions(+), 74 deletions(-) diff --git a/pyatlan/client/aio/sso.py b/pyatlan/client/aio/sso.py index 23a168b4f..5e92e9e98 100644 --- a/pyatlan/client/aio/sso.py +++ b/pyatlan/client/aio/sso.py @@ -80,19 +80,19 @@ async def create_group_mapping( "Atlan group must have a 'name' field populated", sso_alias, ) - + logger.info( f"Creating SSO group mapping: {atlan_group.alias} (ID: {atlan_group.id}) " f"<-> {sso_group_name} (SSO: {sso_alias})" ) - + await self._check_existing_group_mappings(sso_alias, atlan_group) endpoint, request_obj = SSOCreateGroupMapping.prepare_request( sso_alias, atlan_group, sso_group_name ) raw_json = await self._client._call_api(endpoint, request_obj=request_obj) result = SSOCreateGroupMapping.process_response(raw_json) - + logger.info(f"Successfully created SSO group mapping with ID: {result.id}") return result @@ -153,7 +153,7 @@ async def get_group_mapping(self, sso_alias: str, group_map_id: str) -> SSOMappe async def delete_group_mapping(self, sso_alias: str, group_map_id: str) -> None: """ Deletes an existing Atlan SSO group mapping. - + Note: This only deletes the SSO mapping (identity provider mapper). If you're experiencing issues with Okta Push Groups failing with stale externalId errors, you may need to run the diagnostic script @@ -165,17 +165,17 @@ async def delete_group_mapping(self, sso_alias: str, group_map_id: str) -> None: :returns: an empty response (`None`). """ logger.info(f"Deleting SSO group mapping: {group_map_id} (SSO: {sso_alias})") - + endpoint, request_obj = SSODeleteGroupMapping.prepare_request( sso_alias, group_map_id ) raw_json = await self._client._call_api(endpoint, request_obj=request_obj) - + logger.info(f"Successfully deleted SSO group mapping: {group_map_id}") logger.debug( "If you're experiencing Okta Push Groups issues with stale externalId, " "run: python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings " f"--mode diagnose --sso-alias {sso_alias}" ) - + return raw_json diff --git a/pyatlan/client/sso.py b/pyatlan/client/sso.py index 64fd449c2..9bbcbf260 100644 --- a/pyatlan/client/sso.py +++ b/pyatlan/client/sso.py @@ -72,19 +72,19 @@ def create_group_mapping( "Atlan group must have a 'name' field populated", sso_alias, ) - + logger.info( f"Creating SSO group mapping: {atlan_group.alias} (ID: {atlan_group.id}) " f"<-> {sso_group_name} (SSO: {sso_alias})" ) - + self._check_existing_group_mappings(sso_alias, atlan_group) endpoint, request_obj = SSOCreateGroupMapping.prepare_request( sso_alias, atlan_group, sso_group_name ) raw_json = self._client._call_api(endpoint, request_obj=request_obj) result = SSOCreateGroupMapping.process_response(raw_json) - + logger.info(f"Successfully created SSO group mapping with ID: {result.id}") return result @@ -145,7 +145,7 @@ def get_group_mapping(self, sso_alias: str, group_map_id: str) -> SSOMapper: def delete_group_mapping(self, sso_alias: str, group_map_id: str) -> None: """ Deletes an existing Atlan SSO group mapping. - + Note: This only deletes the SSO mapping (identity provider mapper). If you're experiencing issues with Okta Push Groups failing with stale externalId errors, you may need to run the diagnostic script @@ -157,17 +157,17 @@ def delete_group_mapping(self, sso_alias: str, group_map_id: str) -> None: :returns: an empty response (`None`). """ logger.info(f"Deleting SSO group mapping: {group_map_id} (SSO: {sso_alias})") - + endpoint, request_obj = SSODeleteGroupMapping.prepare_request( sso_alias, group_map_id ) raw_json = self._client._call_api(endpoint, request_obj=request_obj) - + logger.info(f"Successfully deleted SSO group mapping: {group_map_id}") logger.debug( "If you're experiencing Okta Push Groups issues with stale externalId, " "run: python -m pyatlan.samples.sso.diagnose_orphaned_group_mappings " f"--mode diagnose --sso-alias {sso_alias}" ) - + return raw_json diff --git a/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py b/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py index b5bd03604..7dfa7cfe1 100644 --- a/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py +++ b/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py @@ -30,7 +30,7 @@ import argparse import logging import sys -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional from pyatlan.client.atlan import AtlanClient from pyatlan.errors import AtlanError @@ -54,12 +54,12 @@ def __init__(self, client: AtlanClient): def get_all_groups(self) -> Dict[str, AtlanGroup]: """ Retrieve all Atlan groups. - + :returns: Dictionary mapping group ID to AtlanGroup """ logger.info("Retrieving all Atlan groups...") groups = {} - + try: response = self.client.group.get_all(limit=100) for group in response: @@ -69,18 +69,18 @@ def get_all_groups(self) -> Dict[str, AtlanGroup]: except AtlanError as e: logger.error(f"Error retrieving groups: {e}") raise - + return groups def get_all_sso_mappings(self, sso_alias: str) -> List[SSOMapper]: """ Retrieve all SSO group mappings for a given SSO provider. - + :param sso_alias: SSO provider alias (e.g., 'okta', 'azure', 'jumpcloud') :returns: List of SSO group mappings """ logger.info(f"Retrieving SSO group mappings for '{sso_alias}'...") - + try: mappings = self.client.sso.get_all_group_mappings(sso_alias=sso_alias) logger.info(f"Found {len(mappings)} SSO group mappings") @@ -94,11 +94,11 @@ def diagnose_orphaned_mappings( ) -> Dict[str, List[SSOMapper]]: """ Diagnose orphaned SSO group mappings. - + An orphaned mapping is one where: - The mapping references a group ID that doesn't exist - The mapping's group name doesn't match the actual group name - + :param sso_alias: SSO provider alias :param target_group_name: Optional specific group name to check :returns: Dictionary with 'orphaned' and 'valid' mapping lists @@ -106,57 +106,62 @@ def diagnose_orphaned_mappings( logger.info("=" * 80) logger.info("DIAGNOSING SSO GROUP MAPPINGS") logger.info("=" * 80) - + groups = self.get_all_groups() mappings = self.get_all_sso_mappings(sso_alias) - + # Create reverse lookup: group name -> group ID group_name_to_id = {g.name: g.id for g in groups.values() if g.name} - + orphaned = [] valid = [] suspicious = [] - + logger.info("\nAnalyzing mappings...") logger.info("-" * 80) - + for mapping in mappings: if not mapping.name or not mapping.config or not mapping.config.group_name: logger.warning(f"āš ļø Mapping {mapping.id} has incomplete data") suspicious.append(mapping) continue - + # Extract group ID from mapping name (format: --) - mapping_group_id = mapping.name.split("--")[0] if "--" in mapping.name else None + mapping_group_id = ( + mapping.name.split("--")[0] if "--" in mapping.name else None + ) mapped_group_name = mapping.config.group_name - + # Skip if target group specified and this isn't it if target_group_name and mapped_group_name != target_group_name: continue - + # Check if this is orphaned is_orphaned = False orphan_reason = [] - + # Case 1: Group ID in mapping doesn't exist if mapping_group_id and mapping_group_id not in groups: is_orphaned = True orphan_reason.append(f"Group ID '{mapping_group_id}' not found") - + # Case 2: Group name doesn't match any existing group if mapped_group_name not in group_name_to_id: is_orphaned = True orphan_reason.append(f"Group name '{mapped_group_name}' not found") - + # Case 3: Group ID in mapping doesn't match current group with that name - elif mapping_group_id and group_name_to_id.get(mapped_group_name) != mapping_group_id: + elif ( + mapping_group_id + and group_name_to_id.get(mapped_group_name) != mapping_group_id + ): is_orphaned = True current_id = group_name_to_id.get(mapped_group_name) orphan_reason.append( f"Group ID mismatch: mapping has '{mapping_group_id}' " f"but current group has '{current_id}'" ) - + if is_orphaned: orphaned.append(mapping) logger.warning(f"šŸ”“ ORPHANED: {mapping.id}") @@ -173,7 +178,7 @@ def diagnose_orphaned_mappings( logger.info(f" Group Name: {mapped_group_name}") logger.info(f" SSO Group: {mapping.config.attribute_value}") logger.info("") - + logger.info("=" * 80) logger.info("SUMMARY") logger.info("=" * 80) @@ -182,7 +187,7 @@ def diagnose_orphaned_mappings( logger.info(f"šŸ”“ Orphaned mappings: {len(orphaned)}") logger.info(f"āš ļø Suspicious mappings: {len(suspicious)}") logger.info("=" * 80) - + return { "orphaned": orphaned, "valid": valid, @@ -192,19 +197,19 @@ def diagnose_orphaned_mappings( def list_all_mappings(self, sso_alias: str) -> None: """ List all SSO group mappings in a readable format. - + :param sso_alias: SSO provider alias """ logger.info("=" * 80) logger.info(f"SSO GROUP MAPPINGS FOR '{sso_alias}'") logger.info("=" * 80) - + mappings = self.get_all_sso_mappings(sso_alias) - + if not mappings: logger.info("No SSO group mappings found") return - + for i, mapping in enumerate(mappings, 1): logger.info(f"\n{i}. Mapping ID: {mapping.id}") logger.info(f" Mapping Name: {mapping.name}") @@ -214,7 +219,7 @@ def list_all_mappings(self, sso_alias: str) -> None: logger.info(f" Sync Mode: {mapping.config.sync_mode}") logger.info(f" Provider: {mapping.identity_provider_alias}") logger.info(f" Mapper Type: {mapping.identity_provider_mapper}") - + logger.info("\n" + "=" * 80) def cleanup_orphaned_mappings( @@ -222,7 +227,7 @@ def cleanup_orphaned_mappings( ) -> int: """ Clean up orphaned SSO group mappings. - + :param sso_alias: SSO provider alias :param interactive: If True, ask for confirmation before each deletion :returns: Number of mappings deleted @@ -230,19 +235,23 @@ def cleanup_orphaned_mappings( logger.info("=" * 80) logger.info("CLEANUP MODE") logger.info("=" * 80) - + results = self.diagnose_orphaned_mappings(sso_alias) orphaned = results["orphaned"] - + if not orphaned: logger.info("\nāœ… No orphaned mappings found. Nothing to clean up!") return 0 - + logger.warning(f"\nāš ļø Found {len(orphaned)} orphaned mapping(s) to clean up") - + deleted_count = 0 - + for mapping in orphaned: + if not mapping.id: + logger.warning("Skipping mapping with no ID") + continue + if interactive: logger.info("\n" + "-" * 80) logger.info(f"Mapping ID: {mapping.id}") @@ -250,13 +259,13 @@ def cleanup_orphaned_mappings( if mapping.config: logger.info(f"Group Name: {mapping.config.group_name}") logger.info(f"SSO Group: {mapping.config.attribute_value}") - + response = input("\nDelete this mapping? (y/n): ").strip().lower() - - if response != 'y': + + if response != "y": logger.info("Skipped") continue - + try: logger.info(f"Deleting mapping {mapping.id}...") self.client.sso.delete_group_mapping( @@ -267,11 +276,11 @@ def cleanup_orphaned_mappings( deleted_count += 1 except AtlanError as e: logger.error(f"āŒ Error deleting mapping {mapping.id}: {e}") - + logger.info("\n" + "=" * 80) logger.info(f"Cleanup complete. Deleted {deleted_count} orphaned mapping(s)") logger.info("=" * 80) - + return deleted_count @@ -282,33 +291,33 @@ def main(): formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) - + parser.add_argument( "--mode", required=True, choices=["diagnose", "cleanup", "list"], help="Operation mode: diagnose (check for issues), cleanup (remove orphaned mappings), or list (show all mappings)", ) - + parser.add_argument( "--sso-alias", required=True, help="SSO provider alias (e.g., 'okta', 'azure', 'jumpcloud')", ) - + parser.add_argument( "--group-name", help="Optional: Specific Atlan group name to diagnose", ) - + parser.add_argument( "--non-interactive", action="store_true", help="Run cleanup without prompting for confirmation (dangerous!)", ) - + args = parser.parse_args() - + # Initialize client try: client = AtlanClient() @@ -317,46 +326,58 @@ def main(): logger.error(f"Failed to initialize Atlan client: {e}") logger.error("Make sure ATLAN_BASE_URL and ATLAN_API_KEY are set") sys.exit(1) - + # Initialize diagnostic tool diagnostic = SSOGroupMappingDiagnostic(client) - + # Execute requested mode try: if args.mode == "list": diagnostic.list_all_mappings(args.sso_alias) - + elif args.mode == "diagnose": results = diagnostic.diagnose_orphaned_mappings( sso_alias=args.sso_alias, target_group_name=args.group_name, ) - + if results["orphaned"]: - logger.info("\nšŸ’” TIP: Run with --mode cleanup to remove orphaned mappings") + logger.info( + "\nšŸ’” TIP: Run with --mode cleanup to remove orphaned mappings" + ) sys.exit(1) # Exit with error code if orphaned mappings found - + elif args.mode == "cleanup": if args.non_interactive: logger.warning("āš ļø Running in non-interactive mode!") - logger.warning("āš ļø All orphaned mappings will be deleted automatically!") - response = input("Are you sure you want to continue? (yes/no): ").strip().lower() + logger.warning( + "āš ļø All orphaned mappings will be deleted automatically!" + ) + response = ( + input("Are you sure you want to continue? (yes/no): ") + .strip() + .lower() + ) if response != "yes": logger.info("Cleanup cancelled") sys.exit(0) - + deleted = diagnostic.cleanup_orphaned_mappings( sso_alias=args.sso_alias, interactive=not args.non_interactive, ) - + if deleted > 0: logger.info("\nāœ… Cleanup complete!") logger.info("\nšŸ’” NEXT STEPS:") - logger.info("1. In Okta, try unlinking and re-linking the affected group") + logger.info( + "1. In Okta, try unlinking and re-linking the affected group" + ) logger.info("2. Use 'Link Group' (not 'Create') when re-linking") - logger.info("3. The group push should now work without the stale externalId error") - + logger.info( + "3. The group push should now work without the stale externalId error" + ) + except KeyboardInterrupt: logger.info("\n\nOperation cancelled by user") sys.exit(130) From 251379e9f9cf5931590194b9c604b06e736a5215 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 11 Feb 2026 14:34:33 +0000 Subject: [PATCH 3/4] fix: Additional code formatting for Python 3.10 compatibility - Break long lines in docstrings and help text - Split long conditional statements across multiple lines - Format function signatures to be within line length limits --- .../sso/diagnose_orphaned_group_mappings.py | 14 +++++++++++--- tests/unit/test_sso_diagnostic.py | 8 ++++---- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py b/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py index 7dfa7cfe1..18f3844ad 100644 --- a/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py +++ b/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py @@ -14,7 +14,8 @@ python diagnose_orphaned_group_mappings.py --mode diagnose --sso-alias # Diagnose specific group - python diagnose_orphaned_group_mappings.py --mode diagnose --sso-alias --group-name + python diagnose_orphaned_group_mappings.py --mode diagnose \\ + --sso-alias --group-name # Clean up orphaned mappings (interactive) python diagnose_orphaned_group_mappings.py --mode cleanup --sso-alias @@ -121,7 +122,11 @@ def diagnose_orphaned_mappings( logger.info("-" * 80) for mapping in mappings: - if not mapping.name or not mapping.config or not mapping.config.group_name: + if ( + not mapping.name + or not mapping.config + or not mapping.config.group_name + ): logger.warning(f"āš ļø Mapping {mapping.id} has incomplete data") suspicious.append(mapping) continue @@ -296,7 +301,10 @@ def main(): "--mode", required=True, choices=["diagnose", "cleanup", "list"], - help="Operation mode: diagnose (check for issues), cleanup (remove orphaned mappings), or list (show all mappings)", + help=( + "Operation mode: diagnose (check for issues), " + "cleanup (remove orphaned mappings), or list (show all mappings)" + ), ) parser.add_argument( diff --git a/tests/unit/test_sso_diagnostic.py b/tests/unit/test_sso_diagnostic.py index 9b4295668..e50cdd62a 100644 --- a/tests/unit/test_sso_diagnostic.py +++ b/tests/unit/test_sso_diagnostic.py @@ -171,7 +171,9 @@ def test_diagnose_with_target_group_filter( mock_client.sso.get_all_group_mappings.return_value = sample_mappings # Execute - filter for test_group_1 - results = diagnostic.diagnose_orphaned_mappings("okta", target_group_name="test_group_1") + results = diagnostic.diagnose_orphaned_mappings( + "okta", target_group_name="test_group_1" + ) # Verify - should only check mappings for test_group_1 # mapping-1 is valid, mapping-4 is orphaned (both reference test_group_1) @@ -246,9 +248,7 @@ def test_cleanup_orphaned_mappings_non_interactive( assert mock_client.sso.delete_group_mapping.call_count == 2 -def test_cleanup_no_orphaned_mappings( - diagnostic, mock_client, sample_groups -): +def test_cleanup_no_orphaned_mappings(diagnostic, mock_client, sample_groups): """Test cleanup when there are no orphaned mappings.""" # Create only valid mappings valid_mapping = SSOMapper( From acc405339be9cf70178648eb4d535937b21e50c9 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 11 Feb 2026 14:38:03 +0000 Subject: [PATCH 4/4] fix: Remove trailing whitespace in docstring Remove trailing whitespace from empty lines in diagnostic script docstring --- pyatlan/samples/sso/diagnose_orphaned_group_mappings.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py b/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py index 18f3844ad..28e308f97 100644 --- a/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py +++ b/pyatlan/samples/sso/diagnose_orphaned_group_mappings.py @@ -12,14 +12,14 @@ Usage: # Diagnose orphaned mappings python diagnose_orphaned_group_mappings.py --mode diagnose --sso-alias - + # Diagnose specific group python diagnose_orphaned_group_mappings.py --mode diagnose \\ --sso-alias --group-name - + # Clean up orphaned mappings (interactive) python diagnose_orphaned_group_mappings.py --mode cleanup --sso-alias - + # List all SSO group mappings python diagnose_orphaned_group_mappings.py --mode list --sso-alias