Refactored, handle failures more gracefully
+ Split ConnectionHandler into its own struct + Handle errors more gracefully + System no longer locks up or crashes if a bot goes offline + Bots who go offline are automatically removed from the connector poolmain
parent
c6031f5fa4
commit
0136772c23
45
src/main.rs
45
src/main.rs
|
@ -1,12 +1,17 @@
|
||||||
|
use crate::robot::connection_handler::ConnectionHandler;
|
||||||
use crate::robot::robot_manager::RobotManager;
|
use crate::robot::robot_manager::RobotManager;
|
||||||
use log::info;
|
use crate::robot::ConnectorManager;
|
||||||
|
use log::{error, info};
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use raas_types::raas;
|
use raas_types::raas;
|
||||||
use raas_types::raas::cmd::Request;
|
use raas_types::raas::cmd::Request;
|
||||||
use raas_types::raas::ping::Ping;
|
use raas_types::raas::ping::Ping;
|
||||||
use raas_types::raas::{recv_raas_msg, send_raas_msg};
|
use raas_types::raas::{recv_raas_msg, send_raas_msg};
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::io::duplex;
|
use tokio::io::duplex;
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
mod robot;
|
mod robot;
|
||||||
|
|
||||||
|
@ -15,10 +20,25 @@ async fn main() {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
info!("Starting...");
|
info!("Starting...");
|
||||||
|
|
||||||
let (mut main_stream, manager_stream) = duplex(10);
|
let addr = "0.0.0.0:8080";
|
||||||
let mut manager = RobotManager::new("0.0.0.0:8080", manager_stream);
|
|
||||||
|
|
||||||
manager.start_manager().await.unwrap();
|
let connector_manager = ConnectorManager {
|
||||||
|
next_id: 0,
|
||||||
|
connectors: Default::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let connector_manager = Arc::new(Mutex::new(connector_manager));
|
||||||
|
|
||||||
|
let tcp_listener = TcpListener::bind(addr).await.unwrap();
|
||||||
|
|
||||||
|
let (mut main_stream, manager_stream) = duplex(10);
|
||||||
|
|
||||||
|
let robot_manager = RobotManager::new(manager_stream);
|
||||||
|
let connector_handler = ConnectionHandler::new(addr, tcp_listener);
|
||||||
|
|
||||||
|
let robot_manager_connector_manager = connector_manager.clone();
|
||||||
|
tokio::task::spawn(robot_manager.worker(robot_manager_connector_manager));
|
||||||
|
tokio::task::spawn(connector_handler.worker(connector_manager));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
info!("Sleeping");
|
info!("Sleeping");
|
||||||
|
@ -43,8 +63,21 @@ async fn main() {
|
||||||
send_raas_msg(&mut main_stream, msg).await.unwrap();
|
send_raas_msg(&mut main_stream, msg).await.unwrap();
|
||||||
|
|
||||||
info!("Wait for resp");
|
info!("Wait for resp");
|
||||||
let resp = recv_raas_msg(&mut main_stream).await.unwrap();
|
let ret =
|
||||||
|
tokio::time::timeout(Duration::from_secs(5), recv_raas_msg(&mut main_stream)).await;
|
||||||
|
|
||||||
info!("Got resp {:?}", resp);
|
match ret {
|
||||||
|
Ok(resp) => match resp {
|
||||||
|
Ok(resp) => {
|
||||||
|
info!("Got response {:?}", resp);
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("Error getting response: {}", err)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!("Timed out waiting for response!")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
use crate::robot::robot_connector::RobotConnector;
|
||||||
|
use crate::robot::ROBOT_MESSAGE_QUEUE_SIZE;
|
||||||
|
use crate::robot::{ConnectorManager, Error};
|
||||||
|
use log::{error, info};
|
||||||
|
use prost::Message;
|
||||||
|
use raas_types::raas::register::{Register, RegisterResponse};
|
||||||
|
use raas_types::raas::{recv_raas_msg, send_raas_msg};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::io::duplex;
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
pub struct ConnectionHandler {
|
||||||
|
tcp_listener: TcpListener,
|
||||||
|
addr: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionHandler {
|
||||||
|
pub fn new(addr: &str, tcp_listener: TcpListener) -> Self {
|
||||||
|
Self {
|
||||||
|
tcp_listener,
|
||||||
|
addr: addr.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_register(
|
||||||
|
&mut self,
|
||||||
|
connector_manager: Arc<Mutex<ConnectorManager>>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
info!("Listening for connection at {}", self.addr);
|
||||||
|
let (mut socket, addr) = self.tcp_listener.accept().await?;
|
||||||
|
info!("Got connection from {}", addr);
|
||||||
|
|
||||||
|
let register_msg = recv_raas_msg(&mut socket).await?;
|
||||||
|
|
||||||
|
let register = Register::decode(&*register_msg.msg)?;
|
||||||
|
|
||||||
|
let mut connector_manager = connector_manager.lock().await;
|
||||||
|
|
||||||
|
let id = connector_manager.next_id;
|
||||||
|
connector_manager.next_id += 1;
|
||||||
|
|
||||||
|
let register_resp = RegisterResponse {
|
||||||
|
name: register.name.to_string(),
|
||||||
|
r#type: register.bot_type,
|
||||||
|
id,
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut msg = Vec::new();
|
||||||
|
|
||||||
|
register_resp.encode(&mut msg)?;
|
||||||
|
|
||||||
|
send_raas_msg(&mut socket, msg).await?;
|
||||||
|
|
||||||
|
let (connector_duplex_manager, connector_duplex_connector) =
|
||||||
|
duplex(ROBOT_MESSAGE_QUEUE_SIZE);
|
||||||
|
|
||||||
|
let mut connector = RobotConnector::new(
|
||||||
|
id,
|
||||||
|
vec![register.bot_type()],
|
||||||
|
socket,
|
||||||
|
connector_duplex_connector,
|
||||||
|
);
|
||||||
|
|
||||||
|
let connector_handler = crate::robot::ConnectorHandle {
|
||||||
|
stream: connector_duplex_manager,
|
||||||
|
tags: connector.bot_tags.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
connector_manager.connectors.insert(id, connector_handler);
|
||||||
|
|
||||||
|
tokio::spawn(async move { connector.worker().await });
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn worker(
|
||||||
|
mut self,
|
||||||
|
connector_manager: Arc<Mutex<ConnectorManager>>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
loop {
|
||||||
|
let res = self.handle_register(connector_manager.clone()).await;
|
||||||
|
|
||||||
|
if let Err(err) = res {
|
||||||
|
error!("Got error handling new register: {}", err);
|
||||||
|
|
||||||
|
if let Error::Io(_io_err) = &err {
|
||||||
|
error!("IO Error, exiting...");
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,34 @@
|
||||||
|
use raas_types::raas;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use thiserror::Error;
|
||||||
|
use tokio::io::DuplexStream;
|
||||||
|
|
||||||
|
pub mod connection_handler;
|
||||||
pub mod robot_connector;
|
pub mod robot_connector;
|
||||||
pub mod robot_manager;
|
pub mod robot_manager;
|
||||||
|
|
||||||
pub const ROBOT_MESSAGE_QUEUE_SIZE: usize = 10;
|
pub const ROBOT_MESSAGE_QUEUE_SIZE: usize = 10;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ConnectorHandle {
|
||||||
|
pub stream: DuplexStream,
|
||||||
|
pub tags: Vec<raas::register::BotTypes>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ConnectorManager {
|
||||||
|
pub next_id: u32,
|
||||||
|
pub connectors: HashMap<u32, ConnectorHandle>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("IO Error")]
|
||||||
|
Io(#[from] std::io::Error),
|
||||||
|
#[error("Protobuf Encode Error")]
|
||||||
|
ProtobufEncode(#[from] prost::EncodeError),
|
||||||
|
#[error("Protobuf Decode Error")]
|
||||||
|
ProtobufDecode(#[from] prost::DecodeError),
|
||||||
|
#[error("Connection to bot has been closed")]
|
||||||
|
ConnectionClosed(u32),
|
||||||
|
}
|
||||||
|
|
|
@ -1,23 +1,13 @@
|
||||||
|
use crate::robot::Error;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use raas_types::raas::cmd::{Command, Request};
|
use raas_types::raas::cmd::{Command, Request};
|
||||||
use raas_types::raas::register::BotTypes;
|
use raas_types::raas::register::BotTypes;
|
||||||
use raas_types::raas::resp::Response;
|
use raas_types::raas::resp::Response;
|
||||||
use raas_types::raas::{recv_raas_msg, send_raas_msg};
|
use raas_types::raas::{recv_raas_msg, send_raas_msg};
|
||||||
use thiserror::Error;
|
|
||||||
use tokio::io::DuplexStream;
|
use tokio::io::DuplexStream;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
|
||||||
pub enum Error {
|
|
||||||
#[error("IO Error")]
|
|
||||||
Io(#[from] std::io::Error),
|
|
||||||
#[error("Protobuf Encode Error")]
|
|
||||||
ProtobufEncode(#[from] prost::EncodeError),
|
|
||||||
#[error("Protobuf Decode Error")]
|
|
||||||
ProtobufDecode(#[from] prost::DecodeError),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct RobotConnector {
|
pub struct RobotConnector {
|
||||||
pub bot_id: u32,
|
pub bot_id: u32,
|
||||||
|
@ -85,10 +75,6 @@ impl RobotConnector {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_running(&self) -> bool {
|
|
||||||
self.is_running
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn worker(&mut self) {
|
pub async fn worker(&mut self) {
|
||||||
self.is_running = true;
|
self.is_running = true;
|
||||||
|
|
||||||
|
|
|
@ -1,129 +1,42 @@
|
||||||
use crate::robot::robot_connector::{Error, RobotConnector};
|
use crate::robot::{ConnectorManager, Error};
|
||||||
use crate::robot::ROBOT_MESSAGE_QUEUE_SIZE;
|
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use prost::Message;
|
use prost::Message;
|
||||||
use raas_types::raas;
|
use raas_types::raas;
|
||||||
use raas_types::raas::bot::roll::Roll;
|
|
||||||
use raas_types::raas::cmd::request::Cmd;
|
use raas_types::raas::cmd::request::Cmd;
|
||||||
use raas_types::raas::cmd::{Command, Request};
|
use raas_types::raas::cmd::Request;
|
||||||
use raas_types::raas::register::{Register, RegisterResponse};
|
|
||||||
use raas_types::raas::resp::response::Resp;
|
|
||||||
use raas_types::raas::resp::Response;
|
|
||||||
use raas_types::raas::{recv_raas_msg, send_raas_msg, RaasMessage};
|
use raas_types::raas::{recv_raas_msg, send_raas_msg, RaasMessage};
|
||||||
use std::collections::HashMap;
|
use std::sync::Arc;
|
||||||
use std::ops::{Deref, DerefMut};
|
use tokio::io::{AsyncWriteExt, DuplexStream};
|
||||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
use tokio::sync::Mutex;
|
||||||
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream};
|
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
|
||||||
use tokio::select;
|
|
||||||
use tokio::time::timeout;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub enum RaaSCmd {
|
|
||||||
Roll(u32),
|
|
||||||
Img(Vec<u8>),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct ConnectorHandle {
|
|
||||||
pub id: u32,
|
|
||||||
pub stream: DuplexStream,
|
|
||||||
pub tags: Vec<raas::register::BotTypes>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct ConnectorManager {
|
|
||||||
next_id: u32,
|
|
||||||
connectors: HashMap<u32, ConnectorHandle>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct RobotManager {
|
pub struct RobotManager {
|
||||||
addr: String,
|
command_stream: DuplexStream,
|
||||||
connector_manager: std::sync::Arc<tokio::sync::Mutex<ConnectorManager>>,
|
|
||||||
command_stream: std::sync::Arc<tokio::sync::Mutex<DuplexStream>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RobotManager {
|
impl RobotManager {
|
||||||
pub fn new(addr: &str, stream: DuplexStream) -> Self {
|
pub fn new(stream: DuplexStream) -> Self {
|
||||||
let connector_manager = ConnectorManager {
|
|
||||||
next_id: 0,
|
|
||||||
connectors: Default::default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
addr: addr.to_string(),
|
command_stream: stream,
|
||||||
connector_manager: std::sync::Arc::new(tokio::sync::Mutex::new(connector_manager)),
|
|
||||||
command_stream: std::sync::Arc::new(tokio::sync::Mutex::new(stream)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_register(&self, tcp_listener: &mut TcpListener) -> Result<(), std::io::Error> {
|
async fn recv_command(&mut self) -> Result<RaasMessage, Error> {
|
||||||
info!("Listening for connection at {}", self.addr);
|
let resp = recv_raas_msg(&mut self.command_stream).await?;
|
||||||
let (mut socket, addr) = tcp_listener.accept().await?;
|
|
||||||
info!("Got connection from {}", addr);
|
|
||||||
|
|
||||||
let register_msg = recv_raas_msg(&mut socket).await?;
|
|
||||||
|
|
||||||
let register = Register::decode(&*register_msg.msg).unwrap();
|
|
||||||
|
|
||||||
let mut connector_manager = self.connector_manager.lock().await;
|
|
||||||
|
|
||||||
let id = connector_manager.next_id;
|
|
||||||
connector_manager.next_id += 1;
|
|
||||||
|
|
||||||
let register_resp = RegisterResponse {
|
|
||||||
name: register.name.to_string(),
|
|
||||||
r#type: register.bot_type,
|
|
||||||
id,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut msg = Vec::new();
|
|
||||||
|
|
||||||
register_resp.encode(&mut msg).unwrap();
|
|
||||||
|
|
||||||
send_raas_msg(&mut socket, msg).await?;
|
|
||||||
|
|
||||||
let (connector_duplex_manager, connector_duplex_connector) =
|
|
||||||
duplex(ROBOT_MESSAGE_QUEUE_SIZE);
|
|
||||||
|
|
||||||
let mut connector = RobotConnector::new(
|
|
||||||
id,
|
|
||||||
vec![register.bot_type()],
|
|
||||||
socket,
|
|
||||||
connector_duplex_connector,
|
|
||||||
);
|
|
||||||
|
|
||||||
let connector_handler = ConnectorHandle {
|
|
||||||
id,
|
|
||||||
stream: connector_duplex_manager,
|
|
||||||
tags: connector.bot_tags.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
connector_manager.connectors.insert(id, connector_handler);
|
|
||||||
|
|
||||||
tokio::spawn(async move { connector.worker().await });
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn recv_command(&self) -> Result<RaasMessage, Error> {
|
|
||||||
let mut command_stream = self.command_stream.lock().await;
|
|
||||||
|
|
||||||
let resp = recv_raas_msg(command_stream.deref_mut()).await?;
|
|
||||||
|
|
||||||
Ok(resp)
|
Ok(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_resp(&self, msg: RaasMessage) -> Result<(), Error> {
|
async fn send_resp(&mut self, msg: RaasMessage) -> Result<(), Error> {
|
||||||
let mut command_stream = self.command_stream.lock().await;
|
self.command_stream.write_all(&msg.into_bytes()).await?;
|
||||||
|
|
||||||
command_stream.write_all(&*msg.into_bytes()).await?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_command(&self) -> Result<(), Error> {
|
async fn handle_command(
|
||||||
|
&mut self,
|
||||||
|
connector_manager: Arc<Mutex<ConnectorManager>>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
info!("Waiting for command...");
|
info!("Waiting for command...");
|
||||||
|
|
||||||
let msg = self.recv_command().await?;
|
let msg = self.recv_command().await?;
|
||||||
|
@ -134,7 +47,7 @@ impl RobotManager {
|
||||||
Cmd::RollCmd(_) => raas::register::BotTypes::Roll,
|
Cmd::RollCmd(_) => raas::register::BotTypes::Roll,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut connector_manager = self.connector_manager.lock().await;
|
let mut connector_manager = connector_manager.lock().await;
|
||||||
let connector = connector_manager
|
let connector = connector_manager
|
||||||
.connectors
|
.connectors
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
|
@ -144,7 +57,15 @@ impl RobotManager {
|
||||||
info!("Sending message to connector with id={}", id);
|
info!("Sending message to connector with id={}", id);
|
||||||
let mut msg = Vec::new();
|
let mut msg = Vec::new();
|
||||||
request.encode(&mut msg)?;
|
request.encode(&mut msg)?;
|
||||||
send_raas_msg(&mut connector.stream, msg).await?;
|
let ret = send_raas_msg(&mut connector.stream, msg).await;
|
||||||
|
|
||||||
|
if let Err(err) = ret {
|
||||||
|
error!(
|
||||||
|
"Got '{}' sending message to Robot Connector id={}, closing connection",
|
||||||
|
err, id
|
||||||
|
);
|
||||||
|
return Err(Error::ConnectionClosed(*id));
|
||||||
|
}
|
||||||
|
|
||||||
let resp = recv_raas_msg(&mut connector.stream).await?;
|
let resp = recv_raas_msg(&mut connector.stream).await?;
|
||||||
info!("Got response from id={}", id);
|
info!("Got response from id={}", id);
|
||||||
|
@ -156,21 +77,22 @@ impl RobotManager {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_manager(self) -> Result<(), std::io::Error> {
|
pub async fn worker(
|
||||||
let handle_command_manager = self.clone();
|
mut self,
|
||||||
tokio::spawn(async move {
|
connector_manager: Arc<Mutex<ConnectorManager>>,
|
||||||
loop {
|
) -> Result<(), Error> {
|
||||||
handle_command_manager.handle_command().await.unwrap()
|
loop {
|
||||||
}
|
let res = self.handle_command(connector_manager.clone()).await;
|
||||||
});
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
if let Err(err) = res {
|
||||||
let mut tcp_listener = TcpListener::bind(&self.addr).await.unwrap();
|
error!("Got error in robot manager: {}", err);
|
||||||
loop {
|
|
||||||
self.handle_register(&mut tcp_listener).await.unwrap()
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(())
|
if let Error::ConnectionClosed(id) = err {
|
||||||
|
let mut connection_manager = connector_manager.lock().await;
|
||||||
|
|
||||||
|
connection_manager.connectors.remove(&id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue