Added GRPC interface

+ Handles full throughput from GRPC API to robot and back
+ Added in-band error responses
main
Joey Hines 2024-06-22 19:25:11 -06:00
parent 0136772c23
commit 6129060319
Signed by: joeyahines
GPG Key ID: 995E531F7A569DDB
10 changed files with 1066 additions and 90 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/target /target
.idea/ .idea/
config.toml

922
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -6,10 +6,17 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
raas_types = { version = "0.0.5", features = ["async"], registry = "jojo-dev"} raas_types = { version = "0.0.9", features = ["async"], registry = "jojo-dev"}
prost = "0.12.6" prost = "0.12.6"
tokio = {version = "1.38.0", features = ["io-util", "net", "rt-multi-thread"]} tokio = {version = "1.38.0", features = ["io-util", "net", "rt-multi-thread"]}
axum = "0.7.5" axum = "0.7.5"
env_logger = "0.11.3" env_logger = "0.11.3"
log = "0.4.21" log = "0.4.21"
thiserror = "1.0.61" thiserror = "1.0.61"
tonic = "0.11.0"
config = "0.14.0"
serde = "1.0.203"
structopt = "0.3.26"
[build-dependencies]
tonic-build = "0.11.0"

25
src/config.rs 100644
View File

@ -0,0 +1,25 @@
use config::{Config, File};
use serde::Deserialize;
use std::path::{Path, PathBuf};
use structopt::StructOpt;
#[derive(Debug, StructOpt)]
pub struct Args {
pub cfg_path: PathBuf,
}
#[derive(Debug, Clone, Deserialize)]
pub struct RaasConfig {
pub grpc_server: String,
pub robot_server: String,
}
impl RaasConfig {
pub fn new(config_path: &Path) -> Result<Self, config::ConfigError> {
let cfg = Config::builder()
.add_source(File::from(config_path))
.build()?;
cfg.try_deserialize()
}
}

View File

@ -1,26 +1,37 @@
use crate::config::{Args, RaasConfig};
use crate::robot::connection_handler::ConnectionHandler; use crate::robot::connection_handler::ConnectionHandler;
use crate::robot::robot_manager::RobotManager; use crate::robot::robot_manager::RobotManager;
use crate::robot::ConnectorManager; use crate::robot::{ConnectorManager, ROBOT_MESSAGE_QUEUE_SIZE};
use log::{error, info}; use log::{error, info};
use prost::Message; use raas_types::raas::service::raas_server::RaasServer;
use raas_types::raas; use server::grpc::RaaSServer;
use raas_types::raas::cmd::Request;
use raas_types::raas::ping::Ping;
use raas_types::raas::{recv_raas_msg, send_raas_msg};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use structopt::StructOpt;
use tokio::io::duplex; use tokio::io::duplex;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tonic::transport::Server;
mod config;
mod robot; mod robot;
mod server;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
env_logger::init(); env_logger::init();
info!("Starting..."); info!("Starting...");
let addr = "0.0.0.0:8080"; let args = Args::from_args();
let config = RaasConfig::new(&args.cfg_path);
let config = match config {
Ok(c) => c,
Err(err) => {
error!("Error loading config: {}", err);
return;
}
};
let connector_manager = ConnectorManager { let connector_manager = ConnectorManager {
next_id: 0, next_id: 0,
@ -29,55 +40,22 @@ async fn main() {
let connector_manager = Arc::new(Mutex::new(connector_manager)); let connector_manager = Arc::new(Mutex::new(connector_manager));
let tcp_listener = TcpListener::bind(addr).await.unwrap(); let tcp_listener = TcpListener::bind(&config.robot_server).await.unwrap();
let (mut main_stream, manager_stream) = duplex(10); let (main_stream, manager_stream) = duplex(ROBOT_MESSAGE_QUEUE_SIZE);
let robot_manager = RobotManager::new(manager_stream); let robot_manager = RobotManager::new(manager_stream);
let connector_handler = ConnectionHandler::new(addr, tcp_listener); let connector_handler = ConnectionHandler::new(&config.robot_server, tcp_listener);
let robot_manager_connector_manager = connector_manager.clone(); let robot_manager_connector_manager = connector_manager.clone();
tokio::task::spawn(robot_manager.worker(robot_manager_connector_manager)); tokio::task::spawn(robot_manager.worker(robot_manager_connector_manager));
tokio::task::spawn(connector_handler.worker(connector_manager)); tokio::task::spawn(connector_handler.worker(connector_manager));
loop { info!("Serving grpc at {}", &config.grpc_server);
info!("Sleeping"); let raas_server = RaaSServer::new(main_stream);
tokio::time::sleep(Duration::from_secs(5)).await; Server::builder()
info!("Sending Ping"); .add_service(RaasServer::new(raas_server))
.serve(config.grpc_server.parse().unwrap())
let ping = Ping { .await
data: "ping msg".to_string(), .unwrap()
};
let req = Request {
timestamp: 0,
cmd: Some(raas::cmd::request::Cmd::RollCmd(raas::bot::roll::RollCmd {
cmd: Some(raas::bot::roll::roll_cmd::Cmd::Ping(ping)),
})),
};
let mut msg = Vec::new();
req.encode(&mut msg).unwrap();
send_raas_msg(&mut main_stream, msg).await.unwrap();
info!("Wait for resp");
let ret =
tokio::time::timeout(Duration::from_secs(5), recv_raas_msg(&mut main_stream)).await;
match ret {
Ok(resp) => match resp {
Ok(resp) => {
info!("Got response {:?}", resp);
}
Err(err) => {
error!("Error getting response: {}", err)
}
},
Err(_) => {
error!("Timed out waiting for response!")
}
}
}
} }

View File

@ -29,6 +29,8 @@ pub enum Error {
ProtobufEncode(#[from] prost::EncodeError), ProtobufEncode(#[from] prost::EncodeError),
#[error("Protobuf Decode Error")] #[error("Protobuf Decode Error")]
ProtobufDecode(#[from] prost::DecodeError), ProtobufDecode(#[from] prost::DecodeError),
#[error("Connection to bot has been closed")] #[error("Connection to robot has been closed")]
ConnectionClosed(u32), ConnectionClosed(u32),
#[error("No robots to handle requests")]
NoRobotsToHandleRequest,
} }

View File

@ -61,7 +61,7 @@ impl RobotConnector {
let recv = recv_raas_msg(&mut self.tcp_stream).await?; let recv = recv_raas_msg(&mut self.tcp_stream).await?;
let resp = Response::decode(recv.msg.as_slice()).unwrap(); let resp = Response::decode(recv.msg.as_slice()).unwrap();
info!("Worker (bot_id={}) got resp: {:?}", self.bot_id, resp); info!("Worker (bot_id={}) got resp", self.bot_id);
Ok(resp) Ok(resp)
} }

View File

@ -33,6 +33,30 @@ impl RobotManager {
Ok(()) Ok(())
} }
async fn send_error(
&mut self,
id: u32,
error: raas::error::ErrorType,
err_msg: &str,
) -> Result<(), Error> {
let resp = raas::resp::Response {
id,
timestamp: 0,
resp: Some(raas::resp::response::Resp::Error(raas::error::Error {
err: error.into(),
msg: err_msg.to_string(),
})),
};
let mut msg = Vec::new();
resp.encode(&mut msg)?;
self.send_resp(RaasMessage::new(msg)).await?;
Ok(())
}
async fn handle_command( async fn handle_command(
&mut self, &mut self,
connector_manager: Arc<Mutex<ConnectorManager>>, connector_manager: Arc<Mutex<ConnectorManager>>,
@ -64,6 +88,7 @@ impl RobotManager {
"Got '{}' sending message to Robot Connector id={}, closing connection", "Got '{}' sending message to Robot Connector id={}, closing connection",
err, id err, id
); );
return Err(Error::ConnectionClosed(*id)); return Err(Error::ConnectionClosed(*id));
} }
@ -72,6 +97,7 @@ impl RobotManager {
self.send_resp(resp).await?; self.send_resp(resp).await?;
} else { } else {
error!("No connectors available to handle request"); error!("No connectors available to handle request");
return Err(Error::NoRobotsToHandleRequest);
} }
Ok(()) Ok(())
@ -87,11 +113,40 @@ impl RobotManager {
if let Err(err) = res { if let Err(err) = res {
error!("Got error in robot manager: {}", err); error!("Got error in robot manager: {}", err);
if let Error::ConnectionClosed(id) = err { let (id, error, error_msg) = match err {
Error::ConnectionClosed(id) => {
let mut connection_manager = connector_manager.lock().await; let mut connection_manager = connector_manager.lock().await;
connection_manager.connectors.remove(&id); connection_manager.connectors.remove(&id);
} (
id,
raas::error::ErrorType::RobotOffline,
"Robot offline.".to_string(),
)
}
Error::Io(err) => (
0,
raas::error::ErrorType::RobotError,
format!("IO Error: {}", err),
),
Error::ProtobufEncode(err) => (
0,
raas::error::ErrorType::RobotError,
format!("Encode Error: {}", err),
),
Error::ProtobufDecode(err) => (
0,
raas::error::ErrorType::RobotError,
format!("Decode Error: {}", err),
),
Error::NoRobotsToHandleRequest => (
0,
raas::error::ErrorType::NoRobotsToHandleRequest,
"No robots to handle request".to_string(),
),
};
self.send_error(id, error, error_msg.as_str()).await?;
} }
} }
} }

47
src/server/grpc.rs 100644
View File

@ -0,0 +1,47 @@
use log::info;
use prost::Message;
use raas_types::raas::service::raas_server::Raas;
use raas_types::raas::{recv_raas_msg, send_raas_msg};
use std::ops::DerefMut;
use tokio::io::DuplexStream;
use tonic::{Request, Response, Status};
pub struct RaaSServer {
command_stream: tokio::sync::Mutex<DuplexStream>,
}
impl RaaSServer {
pub fn new(command_stream: DuplexStream) -> Self {
Self {
command_stream: tokio::sync::Mutex::new(command_stream),
}
}
}
#[tonic::async_trait]
impl Raas for RaaSServer {
async fn send_request(
&self,
request: Request<raas_types::raas::cmd::Request>,
) -> Result<Response<raas_types::raas::resp::Response>, Status> {
let mut command_stream = self.command_stream.lock().await;
let remote_addr = request.remote_addr().unwrap();
let request = request.into_inner();
info!("Got request from {}: {:?}", remote_addr, request);
let mut msg = Vec::new();
request.encode(&mut msg).unwrap();
send_raas_msg(command_stream.deref_mut(), msg)
.await
.unwrap();
let resp = recv_raas_msg(command_stream.deref_mut()).await.unwrap();
let resp = raas_types::raas::resp::Response::decode(&*resp.msg).unwrap();
Ok(Response::new(resp))
}
}

View File

@ -0,0 +1 @@
pub mod grpc;