diff --git a/src/sagittarius/mod.rs b/src/sagittarius/mod.rs index 08af9d9..a128886 100644 --- a/src/sagittarius/mod.rs +++ b/src/sagittarius/mod.rs @@ -5,4 +5,5 @@ pub mod flow_type_service_client_impl; pub mod retry; pub mod runtime_function_service_client_impl; pub mod runtime_status_service_client_impl; +pub mod runtime_usage_client_impl; pub mod test_execution_client_impl; diff --git a/src/sagittarius/runtime_usage_client_impl.rs b/src/sagittarius/runtime_usage_client_impl.rs new file mode 100644 index 0000000..5246cea --- /dev/null +++ b/src/sagittarius/runtime_usage_client_impl.rs @@ -0,0 +1,56 @@ +use std::sync::Arc; + +use tokio::sync::Mutex; +use tonic::{Extensions, Request, transport::Channel}; +use tucana::sagittarius::runtime_usage_service_client::RuntimeUsageServiceClient as SagittariusRuntimeUsageServiceClient; + +use crate::authorization::authorization::get_authorization_metadata; + +pub struct SagittariusRuntimeUsageClient { + client: SagittariusRuntimeUsageServiceClient, + token: String, +} + +impl SagittariusRuntimeUsageClient { + pub fn new(channel: Channel, token: String) -> Self { + let client = SagittariusRuntimeUsageServiceClient::new(channel); + Self { client, token } + } + + pub fn new_arc(channel: Channel, token: String) -> Arc> { + Arc::new(Mutex::new(Self::new(channel, token))) + } + + pub async fn update_runtime_usage( + &mut self, + runtime_usage_request: tucana::aquila::RuntimeUsageRequest, + ) -> tucana::aquila::RuntimeUsageResponse { + let request = Request::from_parts( + get_authorization_metadata(&self.token), + Extensions::new(), + tucana::sagittarius::RuntimeUsageRequest { + runtime_usage: runtime_usage_request.runtime_usage, + }, + ); + + let response = match self.client.update(request).await { + Ok(response) => { + log::info!("Successfully transferred Runtime Usages.",); + response.into_inner() + } + Err(err) => { + log::error!("Failed to update Runtime Usage: {:?}", err); + return tucana::aquila::RuntimeUsageResponse { success: false }; + } + }; + + match response.success { + true => log::info!("Sagittarius successfully updated RuntimeUsage."), + false => log::error!("Sagittarius didn't update RuntimeUsage."), + }; + + tucana::aquila::RuntimeUsageResponse { + success: response.success, + } + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 1d1c175..a36c9c1 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -5,8 +5,10 @@ use crate::{ flow_type_service_client_impl::SagittariusFlowTypeServiceClient, runtime_function_service_client_impl::SagittariusRuntimeFunctionServiceClient, runtime_status_service_client_impl::SagittariusRuntimeStatusServiceClient, + runtime_usage_client_impl::SagittariusRuntimeUsageClient, }, server::runtime_status_service_server_impl::AquilaRuntimeStatusServiceServer, + server::runtime_usage_service_server_impl::AquilaRuntimeUsageServiceServer, }; use data_type_service_server_impl::AquilaDataTypeServiceServer; use flow_type_service_server_impl::AquilaFlowTypeServiceServer; @@ -23,12 +25,14 @@ use tucana::aquila::{ flow_type_service_server::FlowTypeServiceServer, runtime_function_definition_service_server::RuntimeFunctionDefinitionServiceServer, runtime_status_service_server::RuntimeStatusServiceServer, + runtime_usage_service_server::RuntimeUsageServiceServer, }; mod data_type_service_server_impl; mod flow_type_service_server_impl; mod runtime_function_service_server_impl; mod runtime_status_service_server_impl; +mod runtime_usage_service_server_impl; pub struct AquilaGRPCServer { token: String, @@ -79,6 +83,13 @@ impl AquilaGRPCServer { info!("RuntimeFunctionService started"); + let runtime_usage_service = Arc::new(Mutex::new(SagittariusRuntimeUsageClient::new( + self.channel.clone(), + self.token.clone(), + ))); + + info!("RuntimeUsageService started"); + let runtime_status_service = Arc::new(Mutex::new( SagittariusRuntimeStatusServiceClient::new(self.channel.clone(), self.token.clone()), )); @@ -89,6 +100,8 @@ impl AquilaGRPCServer { let flow_type_server = AquilaFlowTypeServiceServer::new(flow_type_service.clone()); let runtime_function_server = AquilaRuntimeFunctionServiceServer::new(runtime_function_service.clone()); + let runtime_usage_server = + AquilaRuntimeUsageServiceServer::new(runtime_usage_service.clone()); let runtime_status_server = AquilaRuntimeStatusServiceServer::new(runtime_status_service.clone()); @@ -130,6 +143,10 @@ impl AquilaGRPCServer { runtime_function_server, intercept.clone(), )) + .add_service(RuntimeUsageServiceServer::with_interceptor( + runtime_usage_server, + intercept.clone(), + )) .add_service(RuntimeStatusServiceServer::with_interceptor( runtime_status_server, intercept.clone(), @@ -150,6 +167,10 @@ impl AquilaGRPCServer { runtime_function_server, intercept.clone(), )) + .add_service(RuntimeUsageServiceServer::with_interceptor( + runtime_usage_server, + intercept.clone(), + )) .add_service(RuntimeStatusServiceServer::with_interceptor( runtime_status_server, intercept.clone(), diff --git a/src/server/runtime_usage_service_server_impl.rs b/src/server/runtime_usage_service_server_impl.rs new file mode 100644 index 0000000..a9422ac --- /dev/null +++ b/src/server/runtime_usage_service_server_impl.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; + +use tokio::sync::Mutex; +use tucana::aquila::runtime_usage_service_server::RuntimeUsageService; + +use crate::sagittarius::runtime_usage_client_impl::SagittariusRuntimeUsageClient; + +pub struct AquilaRuntimeUsageServiceServer { + client: Arc>, +} + +impl AquilaRuntimeUsageServiceServer { + pub fn new(client: Arc>) -> Self { + Self { client } + } +} + +#[tonic::async_trait] +impl RuntimeUsageService for AquilaRuntimeUsageServiceServer { + async fn update( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + let runtime_usage_request = request.into_inner(); + + log::debug!("Received RuntimeUsageRequest",); + + let mut client = self.client.lock().await; + let response = client + .update_runtime_usage(runtime_usage_request) + .await; + + Ok(tonic::Response::new(tucana::aquila::RuntimeUsageResponse { + success: response.success, + })) + } +}