Skip to content

Connectors Module Architecture #17

@tfius

Description

@tfius

Parent: #10 (EPIC: Transform DataCortex into Context Engine)
Priority: MEDIUM | Phase: 4 - Multimodal Integration | Complexity: Medium
Depends on: #12 (External Links)

What to Implement

Create the foundational connectors module architecture that enables DataCortex to ingest data from external systems (GitHub, Slack, etc.).

Features

  1. Connector base class/protocol
  2. Connector registry and discovery
  3. Configuration schema for connectors
  4. Sync scheduling (manual, periodic)
  5. Rate limiting and caching
  6. Error handling and retry logic

How to Implement

Step 1: Define Connector Protocol

# src/datacortex/connectors/base.py
@dataclass
class ExternalEntity:
    id: str                    # Unique ID in external system
    source: str                # github, slack, linear
    entity_type: str           # issue, message, user
    title: str
    content: Optional[str]
    url: str
    created_at: datetime
    updated_at: datetime
    author: Optional[str]
    metadata: dict = None

@dataclass
class SyncResult:
    connector: str
    entities_fetched: int
    entities_created: int
    entities_updated: int
    errors: list[str]
    duration_seconds: float
    last_sync: datetime

class Connector(ABC):
    name: str
    entity_types: list[str]

    @abstractmethod
    async def authenticate(self) -> bool:
        pass

    @abstractmethod
    async def test_connection(self) -> bool:
        pass

    @abstractmethod
    async def fetch_entities(
        self, entity_type: str, since: datetime = None, limit: int = None
    ) -> AsyncIterator[ExternalEntity]:
        pass

    @abstractmethod
    async def fetch_entity(self, entity_type: str, entity_id: str) -> Optional[ExternalEntity]:
        pass

    def to_node(self, entity: ExternalEntity) -> Node:
        """Convert external entity to graph node."""

Step 2: Create Connector Registry

# src/datacortex/connectors/registry.py
class ConnectorRegistry:
    _connectors: dict[str, Type[Connector]] = {}

    @classmethod
    def register(cls, name: str):
        def decorator(connector_class):
            cls._connectors[name] = connector_class
            return connector_class
        return decorator

    @classmethod
    def get(cls, name: str) -> Optional[Type[Connector]]:
        return cls._connectors.get(name)

    @classmethod
    def list_available(cls) -> list[str]:
        return list(cls._connectors.keys())

    @classmethod
    def create(cls, name: str, config: dict) -> Connector:
        connector_class = cls.get(name)
        if not connector_class:
            raise ValueError(f"Unknown connector: {name}")
        return connector_class(**config)

Step 3: Configuration Schema

# config/connectors.yaml
connectors:
  github:
    enabled: true
    token: ${GITHUB_TOKEN}
    org: datafund
    repos:
      - datacortex
      - datacore
    sync:
      entities:
        - issues
        - pull_requests
      interval: 3600
      since_days: 30

  slack:
    enabled: false
    token: ${SLACK_TOKEN}
    channels:
      - engineering
    sync:
      entities:
        - messages
      interval: 1800

Step 4: Sync Manager

# src/datacortex/connectors/sync.py
class SyncManager:
    def __init__(self, config: dict):
        self.config = config
        self.results: dict[str, SyncResult] = {}

    async def sync_connector(
        self, name: str, force: bool = False, since: datetime = None
    ) -> SyncResult:
        connector = ConnectorRegistry.create(name, self.config['connectors'][name])
        await connector.authenticate()

        for entity_type in connector_config['sync']['entities']:
            async for entity in connector.fetch_entities(entity_type, since=since):
                node = connector.to_node(entity)
                await self._store_external_node(node, entity)

        return SyncResult(...)

    async def sync_all(self, force: bool = False) -> list[SyncResult]:
        results = []
        for name, config in self.config['connectors'].items():
            if config.get('enabled'):
                results.append(await self.sync_connector(name, force=force))
        return results

Step 5: Add CLI Commands

@cli.group()
def connector():
    """Manage external connectors."""
    pass

@connector.command('list')
def list_connectors():
    """List available connectors."""

@connector.command('sync')
@click.argument('name', required=False)
@click.option('--force', is_flag=True)
@click.option('--since')
def sync_connector(name, force, since):
    """Sync external connector(s)."""

@connector.command('test')
@click.argument('name')
def test_connector(name):
    """Test connector connection."""

Acceptance Criteria

  • Connector base class defined
  • Registry allows connector discovery
  • Configuration file for connector settings
  • Sync manager handles incremental updates
  • CLI commands for listing, syncing, testing
  • Error handling and reporting
  • Rate limiting support in base class

Metadata

Metadata

Assignees

No one assigned

    Labels

    phase-4Phase 4: Multimodal Integrationpriority-mediumMedium priority

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions