-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
phase-4Phase 4: Multimodal IntegrationPhase 4: Multimodal Integrationpriority-mediumMedium priorityMedium priority
Description
Parent: #10 (EPIC: Transform DataCortex into Context Engine)
Priority: MEDIUM | Phase: 4 - Multimodal Integration | Complexity: Medium
Depends on: #12 (External Links)
What to Implement
Create the foundational connectors module architecture that enables DataCortex to ingest data from external systems (GitHub, Slack, etc.).
Features
- Connector base class/protocol
- Connector registry and discovery
- Configuration schema for connectors
- Sync scheduling (manual, periodic)
- Rate limiting and caching
- Error handling and retry logic
How to Implement
Step 1: Define Connector Protocol
# src/datacortex/connectors/base.py
@dataclass
class ExternalEntity:
id: str # Unique ID in external system
source: str # github, slack, linear
entity_type: str # issue, message, user
title: str
content: Optional[str]
url: str
created_at: datetime
updated_at: datetime
author: Optional[str]
metadata: dict = None
@dataclass
class SyncResult:
connector: str
entities_fetched: int
entities_created: int
entities_updated: int
errors: list[str]
duration_seconds: float
last_sync: datetime
class Connector(ABC):
name: str
entity_types: list[str]
@abstractmethod
async def authenticate(self) -> bool:
pass
@abstractmethod
async def test_connection(self) -> bool:
pass
@abstractmethod
async def fetch_entities(
self, entity_type: str, since: datetime = None, limit: int = None
) -> AsyncIterator[ExternalEntity]:
pass
@abstractmethod
async def fetch_entity(self, entity_type: str, entity_id: str) -> Optional[ExternalEntity]:
pass
def to_node(self, entity: ExternalEntity) -> Node:
"""Convert external entity to graph node."""Step 2: Create Connector Registry
# src/datacortex/connectors/registry.py
class ConnectorRegistry:
_connectors: dict[str, Type[Connector]] = {}
@classmethod
def register(cls, name: str):
def decorator(connector_class):
cls._connectors[name] = connector_class
return connector_class
return decorator
@classmethod
def get(cls, name: str) -> Optional[Type[Connector]]:
return cls._connectors.get(name)
@classmethod
def list_available(cls) -> list[str]:
return list(cls._connectors.keys())
@classmethod
def create(cls, name: str, config: dict) -> Connector:
connector_class = cls.get(name)
if not connector_class:
raise ValueError(f"Unknown connector: {name}")
return connector_class(**config)Step 3: Configuration Schema
# config/connectors.yaml
connectors:
github:
enabled: true
token: ${GITHUB_TOKEN}
org: datafund
repos:
- datacortex
- datacore
sync:
entities:
- issues
- pull_requests
interval: 3600
since_days: 30
slack:
enabled: false
token: ${SLACK_TOKEN}
channels:
- engineering
sync:
entities:
- messages
interval: 1800Step 4: Sync Manager
# src/datacortex/connectors/sync.py
class SyncManager:
def __init__(self, config: dict):
self.config = config
self.results: dict[str, SyncResult] = {}
async def sync_connector(
self, name: str, force: bool = False, since: datetime = None
) -> SyncResult:
connector = ConnectorRegistry.create(name, self.config['connectors'][name])
await connector.authenticate()
for entity_type in connector_config['sync']['entities']:
async for entity in connector.fetch_entities(entity_type, since=since):
node = connector.to_node(entity)
await self._store_external_node(node, entity)
return SyncResult(...)
async def sync_all(self, force: bool = False) -> list[SyncResult]:
results = []
for name, config in self.config['connectors'].items():
if config.get('enabled'):
results.append(await self.sync_connector(name, force=force))
return resultsStep 5: Add CLI Commands
@cli.group()
def connector():
"""Manage external connectors."""
pass
@connector.command('list')
def list_connectors():
"""List available connectors."""
@connector.command('sync')
@click.argument('name', required=False)
@click.option('--force', is_flag=True)
@click.option('--since')
def sync_connector(name, force, since):
"""Sync external connector(s)."""
@connector.command('test')
@click.argument('name')
def test_connector(name):
"""Test connector connection."""Acceptance Criteria
- Connector base class defined
- Registry allows connector discovery
- Configuration file for connector settings
- Sync manager handles incremental updates
- CLI commands for listing, syncing, testing
- Error handling and reporting
- Rate limiting support in base class
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
phase-4Phase 4: Multimodal IntegrationPhase 4: Multimodal Integrationpriority-mediumMedium priorityMedium priority