-
Notifications
You must be signed in to change notification settings - Fork 1
feat: Add AsyncSSEClient with aiohttp-based async/await support #58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
keelerm84
wants to merge
3
commits into
main
Choose a base branch
from
mk/sdk-1400/async
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| import json | ||
| import logging | ||
| import os | ||
| import sys | ||
| from logging.config import dictConfig | ||
|
|
||
| import aiohttp.web | ||
| from async_stream_entity import AsyncStreamEntity | ||
|
|
||
| default_port = 8000 | ||
|
|
||
| dictConfig({ | ||
| 'version': 1, | ||
| 'formatters': { | ||
| 'default': { | ||
| 'format': '[%(asctime)s] [%(name)s] %(levelname)s: %(message)s', | ||
| } | ||
| }, | ||
| 'handlers': { | ||
| 'console': { | ||
| 'class': 'logging.StreamHandler', | ||
| 'formatter': 'default' | ||
| } | ||
| }, | ||
| 'root': { | ||
| 'level': 'INFO', | ||
| 'handlers': ['console'] | ||
| }, | ||
| }) | ||
|
|
||
| global_log = logging.getLogger('testservice') | ||
|
|
||
| stream_counter = 0 | ||
| streams = {} | ||
|
|
||
|
|
||
| async def handle_get_status(request): | ||
| body = { | ||
| 'capabilities': [ | ||
| 'comments', | ||
| 'headers', | ||
| 'last-event-id', | ||
| 'read-timeout', | ||
| ] | ||
| } | ||
| return aiohttp.web.Response( | ||
| body=json.dumps(body), | ||
| content_type='application/json', | ||
| ) | ||
|
|
||
|
|
||
| async def handle_delete_stop(request): | ||
| global_log.info("Test service has told us to exit") | ||
| os._exit(0) | ||
|
|
||
|
|
||
| async def handle_post_create_stream(request): | ||
| global stream_counter, streams | ||
|
|
||
| options = json.loads(await request.read()) | ||
|
|
||
| stream_counter += 1 | ||
| stream_id = str(stream_counter) | ||
| resource_url = '/streams/%s' % stream_id | ||
|
|
||
| stream = AsyncStreamEntity(options, request.app['http_session']) | ||
| streams[stream_id] = stream | ||
|
|
||
| return aiohttp.web.Response(status=201, headers={'Location': resource_url}) | ||
|
|
||
|
|
||
| async def handle_post_stream_command(request): | ||
| stream_id = request.match_info['id'] | ||
| params = json.loads(await request.read()) | ||
|
|
||
| stream = streams.get(stream_id) | ||
| if stream is None: | ||
| return aiohttp.web.Response(status=404) | ||
| if not await stream.do_command(params.get('command')): | ||
| return aiohttp.web.Response(status=400) | ||
| return aiohttp.web.Response(status=204) | ||
|
|
||
|
|
||
| async def handle_delete_stream(request): | ||
| stream_id = request.match_info['id'] | ||
|
|
||
| stream = streams.get(stream_id) | ||
| if stream is None: | ||
| return aiohttp.web.Response(status=404) | ||
| await stream.close() | ||
| return aiohttp.web.Response(status=204) | ||
|
|
||
|
|
||
| async def on_startup(app): | ||
| app['http_session'] = aiohttp.ClientSession() | ||
|
|
||
|
|
||
| async def on_cleanup(app): | ||
| await app['http_session'].close() | ||
|
|
||
|
|
||
| def make_app(): | ||
| app = aiohttp.web.Application() | ||
| app.router.add_get('/', handle_get_status) | ||
| app.router.add_delete('/', handle_delete_stop) | ||
| app.router.add_post('/', handle_post_create_stream) | ||
| app.router.add_post('/streams/{id}', handle_post_stream_command) | ||
| app.router.add_delete('/streams/{id}', handle_delete_stream) | ||
| app.on_startup.append(on_startup) | ||
| app.on_cleanup.append(on_cleanup) | ||
| return app | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| port = default_port | ||
| if sys.argv[len(sys.argv) - 1] != 'async_service.py': | ||
| port = int(sys.argv[len(sys.argv) - 1]) | ||
| global_log.info('Listening on port %d', port) | ||
| aiohttp.web.run_app(make_app(), host='0.0.0.0', port=port) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| import asyncio | ||
| import json | ||
| import logging | ||
| import os | ||
| import sys | ||
| import traceback | ||
|
|
||
| import aiohttp | ||
|
|
||
| # Import ld_eventsource from parent directory | ||
| sys.path.insert(1, os.path.join(sys.path[0], '..')) | ||
| from ld_eventsource.actions import Comment, Event, Fault # noqa: E402 | ||
| from ld_eventsource.async_client import AsyncSSEClient # noqa: E402 | ||
| from ld_eventsource.config.async_connect_strategy import \ | ||
| AsyncConnectStrategy # noqa: E402 | ||
| from ld_eventsource.config.error_strategy import ErrorStrategy # noqa: E402 | ||
|
|
||
|
|
||
| def millis_to_seconds(t): | ||
| return None if t is None else t / 1000 | ||
|
|
||
|
|
||
| class AsyncStreamEntity: | ||
| def __init__(self, options, http_session: aiohttp.ClientSession): | ||
| self.options = options | ||
| self.callback_url = options["callbackUrl"] | ||
| self.log = logging.getLogger(options["tag"]) | ||
| self.closed = False | ||
| self.callback_counter = 0 | ||
| self.sse = None | ||
| self._http_session = http_session | ||
| asyncio.create_task(self.run()) | ||
|
|
||
| async def run(self): | ||
| stream_url = self.options["streamUrl"] | ||
| try: | ||
| self.log.info('Opening stream from %s', stream_url) | ||
|
|
||
| request_options = {} | ||
| if self.options.get("readTimeoutMs") is not None: | ||
| request_options["timeout"] = aiohttp.ClientTimeout( | ||
| sock_read=millis_to_seconds(self.options.get("readTimeoutMs")) | ||
| ) | ||
|
|
||
| connect = AsyncConnectStrategy.http( | ||
| url=stream_url, | ||
| headers=self.options.get("headers"), | ||
| aiohttp_request_options=request_options if request_options else None, | ||
| ) | ||
| sse = AsyncSSEClient( | ||
| connect, | ||
| initial_retry_delay=millis_to_seconds(self.options.get("initialDelayMs")), | ||
| last_event_id=self.options.get("lastEventId"), | ||
| error_strategy=ErrorStrategy.from_lambda( | ||
| lambda _: ( | ||
| ErrorStrategy.FAIL if self.closed else ErrorStrategy.CONTINUE, | ||
| None, | ||
| ) | ||
| ), | ||
| logger=self.log, | ||
| ) | ||
| self.sse = sse | ||
| async for item in sse.all: | ||
| if isinstance(item, Event): | ||
| self.log.info('Received event from stream (%s)', item.event) | ||
| await self.send_message( | ||
| { | ||
| 'kind': 'event', | ||
| 'event': { | ||
| 'type': item.event, | ||
| 'data': item.data, | ||
| 'id': item.last_event_id, | ||
| }, | ||
| } | ||
| ) | ||
| elif isinstance(item, Comment): | ||
| self.log.info('Received comment from stream: %s', item.comment) | ||
| await self.send_message({'kind': 'comment', 'comment': item.comment}) | ||
| elif isinstance(item, Fault): | ||
| if self.closed: | ||
| break | ||
| if item.error: | ||
| self.log.info('Received error from stream: %s', item.error) | ||
| await self.send_message({'kind': 'error', 'error': str(item.error)}) | ||
| except Exception as e: | ||
| self.log.info('Received error from stream: %s', e) | ||
| self.log.info(traceback.format_exc()) | ||
| await self.send_message({'kind': 'error', 'error': str(e)}) | ||
|
|
||
| async def do_command(self, command: str) -> bool: | ||
| self.log.info('Test service sent command: %s' % command) | ||
| # currently we support no special commands | ||
| return False | ||
|
|
||
| async def send_message(self, message): | ||
| if self.closed: | ||
| return | ||
| self.callback_counter += 1 | ||
| callback_url = "%s/%d" % (self.callback_url, self.callback_counter) | ||
| try: | ||
| async with self._http_session.post( | ||
| callback_url, | ||
| data=json.dumps(message), | ||
| headers={'Content-Type': 'application/json'}, | ||
| ) as resp: | ||
| if resp.status >= 300 and not self.closed: | ||
| self.log.error('Callback request returned HTTP error %d', resp.status) | ||
| except Exception as e: | ||
| if not self.closed: | ||
| self.log.error('Callback request failed: %s', e) | ||
|
|
||
| async def close(self): | ||
| self.closed = True | ||
| if self.sse is not None: | ||
| await self.sse.close() | ||
| self.log.info('Test ended') | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,10 @@ | ||
| from ld_eventsource.sse_client import * | ||
|
|
||
|
|
||
| def __getattr__(name): | ||
| # Lazily import AsyncSSEClient so that aiohttp (an optional dependency) | ||
| # is never imported for sync-only users who don't have it installed. | ||
| if name == 'AsyncSSEClient': | ||
| from ld_eventsource.async_client import AsyncSSEClient | ||
| return AsyncSSEClient | ||
| raise AttributeError(f"module 'ld_eventsource' has no attribute {name!r}") |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, even without a read timeout, we need to customize the ClientTimeout.

The timeout uses a non-default ClientTimeout.
But if we do set it to anything, then the total will get set to None. https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.ClientTimeout.total
But if we set a ClientTimeout, we would get the default total=None.