From 884ba046cb382d5f6e93410da472d59fa60dbd66 Mon Sep 17 00:00:00 2001 From: r-vdp Date: Fri, 13 Jun 2025 01:13:40 +0200 Subject: [PATCH] feat: split the client in a read-only and a read-write client --- golem_base_sdk/__init__.py | 476 ++++++++++++++++++++----------------- 1 file changed, 255 insertions(+), 221 deletions(-) diff --git a/golem_base_sdk/__init__.py b/golem_base_sdk/__init__.py index 095afe0..8ca5270 100755 --- a/golem_base_sdk/__init__.py +++ b/golem_base_sdk/__init__.py @@ -206,53 +206,29 @@ async def query_entities(self, query: str) -> Sequence[QueryEntitiesResult]: ) -class GolemBaseClient: - """ - The Golem Base client used to interact with Golem Base. - - Many useful methods are implemented directly on this type, while more - generic ethereum methods can be accessed through the underlying - web3 client that you can access with the - `GolemBaseClient.http_client()` - method. - """ - +class GolemBaseROClient: _http_client: GolemBaseHttpClient _ws_client: AsyncWeb3 _golem_base_contract: AsyncContract _background_tasks: set[asyncio.Task[None]] @staticmethod - async def create( - rpc_url: str, ws_url: str, private_key: bytes - ) -> "GolemBaseClient": + async def create_ro_client(rpc_url: str, ws_url: str) -> "GolemBaseROClient": """ Create a `GolemBaseClient` instance. This is the preferred method to create an instance. """ - ws_client = await AsyncWeb3(WebSocketProvider(ws_url)) - return GolemBaseClient(rpc_url, ws_client, private_key) - - async def _start_subscription_loop(self) -> None: - """Create a long running task to handle subscriptions.""" - # The loop will finish when there are no subscriptions left, so this method - # gets called every time a subscription is created, and we'll check - # whether we need to make a new task or whether one is already running. - if not self._background_tasks: - # Start the asyncio event loop - task = asyncio.create_task( - self.ws_client().subscription_manager.handle_subscriptions() - ) - self._background_tasks.add(task) - - def task_done(task: asyncio.Task[None]) -> None: - logger.info("Subscription background task done, removing...") - self._background_tasks.discard(task) + return GolemBaseROClient( + rpc_url, await GolemBaseROClient._create_ws_client(ws_url) + ) - task.add_done_callback(task_done) + @staticmethod + async def _create_ws_client(ws_url: str) -> "AsyncWeb3": + ws_client: AsyncWeb3 = await AsyncWeb3(WebSocketProvider(ws_url)) + return ws_client - def __init__(self, rpc_url: str, ws_client: AsyncWeb3, private_key: bytes) -> None: + def __init__(self, rpc_url: str, ws_client: AsyncWeb3) -> None: """Initialise the GolemBaseClient instance.""" self._http_client = GolemBaseHttpClient(rpc_url) self._ws_client = ws_client @@ -295,21 +271,6 @@ async def inner(show_traceback: bool) -> bool: # Allow caching of certain methods to improve performance self.http_client().provider.cache_allowed_requests = True - # Set up the ethereum account - self.account = self.http_client().eth.account.from_key(private_key) - # Inject a middleware that will sign transactions with the account that - # we created - self.http_client().middleware_onion.inject( - # pylint doesn't detect nested @curry annotations properly... - # pylint: disable=no-value-for-parameter - SignAndSendRawMiddlewareBuilder.build(self.account), - layer=0, - ) - # Set the account as the default, so we don't need to specify the from field - # every time - self.http_client().eth.default_account = self.account.address - logger.debug("Using account: %s", self.account.address) - # https://github.com/pylint-dev/pylint/issues/3162 # pylint: disable=no-member self.golem_base_contract = self.http_client().eth.contract( @@ -344,10 +305,6 @@ async def disconnect(self) -> None: await self.ws_client().subscription_manager.unsubscribe_all() await self.ws_client().provider.disconnect() - def get_account_address(self) -> ChecksumAddress: - """Get the address associated with the private key of this client.""" - return cast(ChecksumAddress, self.account.address) - async def get_storage_value(self, entity_key: EntityKey) -> bytes: """Get the storage value stored in the given entity.""" return await self.http_client().get_storage_value(entity_key) @@ -380,105 +337,102 @@ async def query_entities(self, query: str) -> Sequence[QueryEntitiesResult]: """Get all entities that satisfy the given Golem Base query.""" return await self.http_client().query_entities(query) - async def create_entities( + async def watch_logs( self, - creates: Sequence[GolemBaseCreate], *, - gas: int | None = None, - maxFeePerGas: Wei | None = None, - maxPriorityFeePerGas: Wei | None = None, - ) -> Sequence[CreateEntityReturnType]: - """Create entities in Golem Base.""" - return ( - await self.send_transaction( - creates=creates, - gas=gas, - maxFeePerGas=maxFeePerGas, - maxPriorityFeePerGas=maxPriorityFeePerGas, - ) - ).creates + label: str, + create_callback: Callable[[CreateEntityReturnType], None] | None = None, + update_callback: Callable[[UpdateEntityReturnType], None] | None = None, + delete_callback: Callable[[EntityKey], None] | None = None, + extend_callback: Callable[[ExtendEntityReturnType], None] | None = None, + ) -> WatchLogsHandle: + """ + Subscribe to events on Golem Base. - async def update_entities( - self, - updates: Sequence[GolemBaseUpdate], - *, - gas: int | None = None, - maxFeePerGas: Wei | None = None, - maxPriorityFeePerGas: Wei | None = None, - ) -> Sequence[UpdateEntityReturnType]: - """Update entities in Golem Base.""" - return ( - await self.send_transaction( - updates=updates, - gas=gas, - maxFeePerGas=maxFeePerGas, - maxPriorityFeePerGas=maxPriorityFeePerGas, + You can pass in four different callbacks, and the right one will + be invoked for every create, update, delete, and extend operation. + """ + + async def log_handler( + handler_context: LogsSubscriptionContext, + ) -> None: + # We only use this handler for log receipts + # TypeDicts cannot be checked at runtime + log_receipt = typing.cast(LogReceipt, handler_context.result) + logger.debug("New log: %s", log_receipt) + res = await self._process_golem_base_log_receipt(log_receipt) + + if create_callback: + for create in res.creates: + create_callback(create) + if update_callback: + for update in res.updates: + update_callback(update) + if delete_callback: + for key in res.deletes: + delete_callback(key) + if extend_callback: + for extension in res.extensions: + extend_callback(extension) + + def create_subscription(topic: HexStr) -> LogsSubscription: + return LogsSubscription( + label=f"Golem Base subscription to topic {topic} with label {label}", + address=self.golem_base_contract.address, + topics=[topic], + handler=log_handler, + # optional `handler_context` args to help parse a response + handler_context={}, ) - ).updates - async def delete_entities( - self, - deletes: Sequence[GolemBaseDelete], - *, - gas: int | None = None, - maxFeePerGas: Wei | None = None, - maxPriorityFeePerGas: Wei | None = None, - ) -> Sequence[EntityKey]: - """Delete entities from Golem Base.""" - return ( - await self.send_transaction( - deletes=deletes, - gas=gas, - maxFeePerGas=maxFeePerGas, - maxPriorityFeePerGas=maxPriorityFeePerGas, + event_names = [] + if create_callback: + event_names.append("GolemBaseStorageEntityCreated") + if update_callback: + event_names.append("GolemBaseStorageEntityUpdated") + if delete_callback: + event_names.append("GolemBaseStorageEntityDeleted") + if extend_callback: + event_names.append("GolemBaseStorageEntityBTLExtended") + + events = list( + map( + lambda event_name: create_subscription( + self.golem_base_contract.get_event_by_name(event_name).topic + ), + event_names, ) - ).deletes + ) + subscription_ids = await self._ws_client.subscription_manager.subscribe( + events, + ) + logger.info("Sub ID: %s", subscription_ids) - async def extend_entities( - self, - extensions: Sequence[GolemBaseExtend], - *, - gas: int | None = None, - maxFeePerGas: Wei | None = None, - maxPriorityFeePerGas: Wei | None = None, - ) -> Sequence[ExtendEntityReturnType]: - """Extend the BTL of entities in Golem Base.""" - return ( - await self.send_transaction( - extensions=extensions, - gas=gas, - maxFeePerGas=maxFeePerGas, - maxPriorityFeePerGas=maxPriorityFeePerGas, + # Start a subscription loop in case there is none running + await self._start_subscription_loop() + + async def unsubscribe() -> None: + await self._ws_client.subscription_manager.unsubscribe(subscription_ids) + + return WatchLogsHandle(_unsubscribe=unsubscribe) + + async def _start_subscription_loop(self) -> None: + """Create a long running task to handle subscriptions.""" + # The loop will finish when there are no subscriptions left, so this method + # gets called every time a subscription is created, and we'll check + # whether we need to make a new task or whether one is already running. + if not self._background_tasks: + # Start the asyncio event loop + task = asyncio.create_task( + self.ws_client().subscription_manager.handle_subscriptions() ) - ).extensions + self._background_tasks.add(task) - async def send_transaction( - self, - *, - creates: Sequence[GolemBaseCreate] | None = None, - updates: Sequence[GolemBaseUpdate] | None = None, - deletes: Sequence[GolemBaseDelete] | None = None, - extensions: Sequence[GolemBaseExtend] | None = None, - gas: int | None = None, - maxFeePerGas: Wei | None = None, - maxPriorityFeePerGas: Wei | None = None, - ) -> GolemBaseTransactionReceipt: - """ - Send a generic transaction to Golem Base. + def task_done(task: asyncio.Task[None]) -> None: + logger.info("Subscription background task done, removing...") + self._background_tasks.discard(task) - This transaction can contain multiple create, update, delete and - extend operations. - """ - tx = GolemBaseTransaction( - creates=creates, - updates=updates, - deletes=deletes, - extensions=extensions, - gas=gas, - maxFeePerGas=maxFeePerGas, - maxPriorityFeePerGas=maxPriorityFeePerGas, - ) - return await self._send_gb_transaction(tx) + task.add_done_callback(task_done) async def _process_golem_base_log_receipt( self, @@ -582,6 +536,165 @@ async def process_receipt( extensions=extensions, ) + +class GolemBaseClient(GolemBaseROClient): + """ + The Golem Base client used to interact with Golem Base. + + Many useful methods are implemented directly on this type, while more + generic ethereum methods can be accessed through the underlying + web3 client that you can access with the + `GolemBaseClient.http_client()` + method. + """ + + @staticmethod + async def create_rw_client( + rpc_url: str, ws_url: str, private_key: bytes + ) -> "GolemBaseClient": + """ + Create a read-write Golem Base client. + + This is the preferred method to create an instance. + """ + return GolemBaseClient( + rpc_url, await GolemBaseROClient._create_ws_client(ws_url), private_key + ) + + @staticmethod + async def create( + rpc_url: str, ws_url: str, private_key: bytes + ) -> "GolemBaseClient": + """ + Create a read-write Golem Base client. + + This method is deprecated in favour of `GolemBaseClient.create_rw_client()`. + """ + return await GolemBaseClient.create_rw_client(rpc_url, ws_url, private_key) + + def __init__(self, rpc_url: str, ws_client: AsyncWeb3, private_key: bytes) -> None: + """Initialise the GolemBaseClient instance.""" + super().__init__(rpc_url, ws_client) + + # Set up the ethereum account + self.account = self.http_client().eth.account.from_key(private_key) + # Inject a middleware that will sign transactions with the account that + # we created + self.http_client().middleware_onion.inject( + # pylint doesn't detect nested @curry annotations properly... + # pylint: disable=no-value-for-parameter + SignAndSendRawMiddlewareBuilder.build(self.account), + layer=0, + ) + # Set the account as the default, so we don't need to specify the from field + # every time + self.http_client().eth.default_account = self.account.address + logger.debug("Using account: %s", self.account.address) + + def get_account_address(self) -> ChecksumAddress: + """Get the address associated with the private key of this client.""" + return cast(ChecksumAddress, self.account.address) + + async def create_entities( + self, + creates: Sequence[GolemBaseCreate], + *, + gas: int | None = None, + maxFeePerGas: Wei | None = None, + maxPriorityFeePerGas: Wei | None = None, + ) -> Sequence[CreateEntityReturnType]: + """Create entities in Golem Base.""" + return ( + await self.send_transaction( + creates=creates, + gas=gas, + maxFeePerGas=maxFeePerGas, + maxPriorityFeePerGas=maxPriorityFeePerGas, + ) + ).creates + + async def update_entities( + self, + updates: Sequence[GolemBaseUpdate], + *, + gas: int | None = None, + maxFeePerGas: Wei | None = None, + maxPriorityFeePerGas: Wei | None = None, + ) -> Sequence[UpdateEntityReturnType]: + """Update entities in Golem Base.""" + return ( + await self.send_transaction( + updates=updates, + gas=gas, + maxFeePerGas=maxFeePerGas, + maxPriorityFeePerGas=maxPriorityFeePerGas, + ) + ).updates + + async def delete_entities( + self, + deletes: Sequence[GolemBaseDelete], + *, + gas: int | None = None, + maxFeePerGas: Wei | None = None, + maxPriorityFeePerGas: Wei | None = None, + ) -> Sequence[EntityKey]: + """Delete entities from Golem Base.""" + return ( + await self.send_transaction( + deletes=deletes, + gas=gas, + maxFeePerGas=maxFeePerGas, + maxPriorityFeePerGas=maxPriorityFeePerGas, + ) + ).deletes + + async def extend_entities( + self, + extensions: Sequence[GolemBaseExtend], + *, + gas: int | None = None, + maxFeePerGas: Wei | None = None, + maxPriorityFeePerGas: Wei | None = None, + ) -> Sequence[ExtendEntityReturnType]: + """Extend the BTL of entities in Golem Base.""" + return ( + await self.send_transaction( + extensions=extensions, + gas=gas, + maxFeePerGas=maxFeePerGas, + maxPriorityFeePerGas=maxPriorityFeePerGas, + ) + ).extensions + + async def send_transaction( + self, + *, + creates: Sequence[GolemBaseCreate] | None = None, + updates: Sequence[GolemBaseUpdate] | None = None, + deletes: Sequence[GolemBaseDelete] | None = None, + extensions: Sequence[GolemBaseExtend] | None = None, + gas: int | None = None, + maxFeePerGas: Wei | None = None, + maxPriorityFeePerGas: Wei | None = None, + ) -> GolemBaseTransactionReceipt: + """ + Send a generic transaction to Golem Base. + + This transaction can contain multiple create, update, delete and + extend operations. + """ + tx = GolemBaseTransaction( + creates=creates, + updates=updates, + deletes=deletes, + extensions=extensions, + gas=gas, + maxFeePerGas=maxFeePerGas, + maxPriorityFeePerGas=maxPriorityFeePerGas, + ) + return await self._send_gb_transaction(tx) + async def _send_gb_transaction( self, tx: GolemBaseTransaction ) -> GolemBaseTransactionReceipt: @@ -629,82 +742,3 @@ async def _send_gb_transaction( raise e return await self._process_golem_base_receipt(receipt) - - async def watch_logs( - self, - *, - label: str, - create_callback: Callable[[CreateEntityReturnType], None] | None = None, - update_callback: Callable[[UpdateEntityReturnType], None] | None = None, - delete_callback: Callable[[EntityKey], None] | None = None, - extend_callback: Callable[[ExtendEntityReturnType], None] | None = None, - ) -> WatchLogsHandle: - """ - Subscribe to events on Golem Base. - - You can pass in four different callbacks, and the right one will - be invoked for every create, update, delete, and extend operation. - """ - - async def log_handler( - handler_context: LogsSubscriptionContext, - ) -> None: - # We only use this handler for log receipts - # TypeDicts cannot be checked at runtime - log_receipt = typing.cast(LogReceipt, handler_context.result) - logger.debug("New log: %s", log_receipt) - res = await self._process_golem_base_log_receipt(log_receipt) - - if create_callback: - for create in res.creates: - create_callback(create) - if update_callback: - for update in res.updates: - update_callback(update) - if delete_callback: - for key in res.deletes: - delete_callback(key) - if extend_callback: - for extension in res.extensions: - extend_callback(extension) - - def create_subscription(topic: HexStr) -> LogsSubscription: - return LogsSubscription( - label=f"Golem Base subscription to topic {topic} with label {label}", - address=self.golem_base_contract.address, - topics=[topic], - handler=log_handler, - # optional `handler_context` args to help parse a response - handler_context={}, - ) - - event_names = [] - if create_callback: - event_names.append("GolemBaseStorageEntityCreated") - if update_callback: - event_names.append("GolemBaseStorageEntityUpdated") - if delete_callback: - event_names.append("GolemBaseStorageEntityDeleted") - if extend_callback: - event_names.append("GolemBaseStorageEntityBTLExtended") - - events = list( - map( - lambda event_name: create_subscription( - self.golem_base_contract.get_event_by_name(event_name).topic - ), - event_names, - ) - ) - subscription_ids = await self._ws_client.subscription_manager.subscribe( - events, - ) - logger.info("Sub ID: %s", subscription_ids) - - # Start a subscription loop in case there is none running - await self._start_subscription_loop() - - async def unsubscribe() -> None: - await self._ws_client.subscription_manager.unsubscribe(subscription_ids) - - return WatchLogsHandle(_unsubscribe=unsubscribe)