Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sagittarius/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ pub mod flow_type_service_client_impl;
pub mod retry;
pub mod runtime_function_service_client_impl;
pub mod runtime_status_service_client_impl;
pub mod runtime_usage_client_impl;
pub mod test_execution_client_impl;
56 changes: 56 additions & 0 deletions src/sagittarius/runtime_usage_client_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use std::sync::Arc;

use tokio::sync::Mutex;
use tonic::{Extensions, Request, transport::Channel};
use tucana::sagittarius::runtime_usage_service_client::RuntimeUsageServiceClient as SagittariusRuntimeUsageServiceClient;

use crate::authorization::authorization::get_authorization_metadata;

pub struct SagittariusRuntimeUsageClient {
client: SagittariusRuntimeUsageServiceClient<Channel>,
token: String,
}

impl SagittariusRuntimeUsageClient {
pub fn new(channel: Channel, token: String) -> Self {
let client = SagittariusRuntimeUsageServiceClient::new(channel);
Self { client, token }
}

pub fn new_arc(channel: Channel, token: String) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(Self::new(channel, token)))
}

pub async fn update_runtime_usage(
&mut self,
runtime_usage_request: tucana::aquila::RuntimeUsageRequest,
) -> tucana::aquila::RuntimeUsageResponse {
let request = Request::from_parts(
get_authorization_metadata(&self.token),
Extensions::new(),
tucana::sagittarius::RuntimeUsageRequest {
runtime_usage: runtime_usage_request.runtime_usage,
},
);

let response = match self.client.update(request).await {
Ok(response) => {
log::info!("Successfully transferred Runtime Usages.",);
response.into_inner()
}
Err(err) => {
log::error!("Failed to update Runtime Usage: {:?}", err);
return tucana::aquila::RuntimeUsageResponse { success: false };
}
};

match response.success {
true => log::info!("Sagittarius successfully updated RuntimeUsage."),
false => log::error!("Sagittarius didn't update RuntimeUsage."),
};

tucana::aquila::RuntimeUsageResponse {
success: response.success,
}
}
}
21 changes: 21 additions & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use crate::{
flow_type_service_client_impl::SagittariusFlowTypeServiceClient,
runtime_function_service_client_impl::SagittariusRuntimeFunctionServiceClient,
runtime_status_service_client_impl::SagittariusRuntimeStatusServiceClient,
runtime_usage_client_impl::SagittariusRuntimeUsageClient,
},
server::runtime_status_service_server_impl::AquilaRuntimeStatusServiceServer,
server::runtime_usage_service_server_impl::AquilaRuntimeUsageServiceServer,
};
use data_type_service_server_impl::AquilaDataTypeServiceServer;
use flow_type_service_server_impl::AquilaFlowTypeServiceServer;
Expand All @@ -23,12 +25,14 @@ use tucana::aquila::{
flow_type_service_server::FlowTypeServiceServer,
runtime_function_definition_service_server::RuntimeFunctionDefinitionServiceServer,
runtime_status_service_server::RuntimeStatusServiceServer,
runtime_usage_service_server::RuntimeUsageServiceServer,
};

mod data_type_service_server_impl;
mod flow_type_service_server_impl;
mod runtime_function_service_server_impl;
mod runtime_status_service_server_impl;
mod runtime_usage_service_server_impl;

pub struct AquilaGRPCServer {
token: String,
Expand Down Expand Up @@ -79,6 +83,13 @@ impl AquilaGRPCServer {

info!("RuntimeFunctionService started");

let runtime_usage_service = Arc::new(Mutex::new(SagittariusRuntimeUsageClient::new(
self.channel.clone(),
self.token.clone(),
)));

info!("RuntimeUsageService started");

let runtime_status_service = Arc::new(Mutex::new(
SagittariusRuntimeStatusServiceClient::new(self.channel.clone(), self.token.clone()),
));
Expand All @@ -89,6 +100,8 @@ impl AquilaGRPCServer {
let flow_type_server = AquilaFlowTypeServiceServer::new(flow_type_service.clone());
let runtime_function_server =
AquilaRuntimeFunctionServiceServer::new(runtime_function_service.clone());
let runtime_usage_server =
AquilaRuntimeUsageServiceServer::new(runtime_usage_service.clone());
let runtime_status_server =
AquilaRuntimeStatusServiceServer::new(runtime_status_service.clone());

Expand Down Expand Up @@ -130,6 +143,10 @@ impl AquilaGRPCServer {
runtime_function_server,
intercept.clone(),
))
.add_service(RuntimeUsageServiceServer::with_interceptor(
runtime_usage_server,
intercept.clone(),
))
.add_service(RuntimeStatusServiceServer::with_interceptor(
runtime_status_server,
intercept.clone(),
Expand All @@ -150,6 +167,10 @@ impl AquilaGRPCServer {
runtime_function_server,
intercept.clone(),
))
.add_service(RuntimeUsageServiceServer::with_interceptor(
runtime_usage_server,
intercept.clone(),
))
.add_service(RuntimeStatusServiceServer::with_interceptor(
runtime_status_server,
intercept.clone(),
Expand Down
37 changes: 37 additions & 0 deletions src/server/runtime_usage_service_server_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::sync::Arc;

use tokio::sync::Mutex;
use tucana::aquila::runtime_usage_service_server::RuntimeUsageService;

use crate::sagittarius::runtime_usage_client_impl::SagittariusRuntimeUsageClient;

pub struct AquilaRuntimeUsageServiceServer {
client: Arc<Mutex<SagittariusRuntimeUsageClient>>,
}

impl AquilaRuntimeUsageServiceServer {
pub fn new(client: Arc<Mutex<SagittariusRuntimeUsageClient>>) -> Self {
Self { client }
}
}

#[tonic::async_trait]
impl RuntimeUsageService for AquilaRuntimeUsageServiceServer {
async fn update(
&self,
request: tonic::Request<tucana::aquila::RuntimeUsageRequest>,
) -> Result<tonic::Response<tucana::aquila::RuntimeUsageResponse>, tonic::Status> {
let runtime_usage_request = request.into_inner();

log::debug!("Received RuntimeUsageRequest",);

let mut client = self.client.lock().await;
let response = client
.update_runtime_usage(runtime_usage_request)
.await;

Ok(tonic::Response::new(tucana::aquila::RuntimeUsageResponse {
success: response.success,
}))
}
}