Initial commit
+ Basic robot connector and manages + Handles keep alive ping, with some issues + Need GRPC or other apimain
commit
c6031f5fa4
|
@ -0,0 +1,2 @@
|
|||
[registries.jojo-dev]
|
||||
index = "https://git.jojodev.com/joeyahines/_cargo-index.git"
|
|
@ -0,0 +1,2 @@
|
|||
/target
|
||||
.idea/
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,15 @@
|
|||
[package]
|
||||
name = "raas"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
raas_types = { version = "0.0.5", features = ["async"], registry = "jojo-dev"}
|
||||
prost = "0.12.6"
|
||||
tokio = {version = "1.38.0", features = ["io-util", "net", "rt-multi-thread"]}
|
||||
axum = "0.7.5"
|
||||
env_logger = "0.11.3"
|
||||
log = "0.4.21"
|
||||
thiserror = "1.0.61"
|
|
@ -0,0 +1,50 @@
|
|||
use crate::robot::robot_manager::RobotManager;
|
||||
use log::info;
|
||||
use prost::Message;
|
||||
use raas_types::raas;
|
||||
use raas_types::raas::cmd::Request;
|
||||
use raas_types::raas::ping::Ping;
|
||||
use raas_types::raas::{recv_raas_msg, send_raas_msg};
|
||||
use std::time::Duration;
|
||||
use tokio::io::duplex;
|
||||
|
||||
mod robot;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
env_logger::init();
|
||||
info!("Starting...");
|
||||
|
||||
let (mut main_stream, manager_stream) = duplex(10);
|
||||
let mut manager = RobotManager::new("0.0.0.0:8080", manager_stream);
|
||||
|
||||
manager.start_manager().await.unwrap();
|
||||
|
||||
loop {
|
||||
info!("Sleeping");
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
info!("Sending Ping");
|
||||
|
||||
let ping = Ping {
|
||||
data: "ping msg".to_string(),
|
||||
};
|
||||
|
||||
let req = Request {
|
||||
timestamp: 0,
|
||||
cmd: Some(raas::cmd::request::Cmd::RollCmd(raas::bot::roll::RollCmd {
|
||||
cmd: Some(raas::bot::roll::roll_cmd::Cmd::Ping(ping)),
|
||||
})),
|
||||
};
|
||||
|
||||
let mut msg = Vec::new();
|
||||
|
||||
req.encode(&mut msg).unwrap();
|
||||
|
||||
send_raas_msg(&mut main_stream, msg).await.unwrap();
|
||||
|
||||
info!("Wait for resp");
|
||||
let resp = recv_raas_msg(&mut main_stream).await.unwrap();
|
||||
|
||||
info!("Got resp {:?}", resp);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
pub mod robot_connector;
|
||||
pub mod robot_manager;
|
||||
|
||||
pub const ROBOT_MESSAGE_QUEUE_SIZE: usize = 10;
|
|
@ -0,0 +1,145 @@
|
|||
use log::{error, info};
|
||||
use prost::Message;
|
||||
use raas_types::raas::cmd::{Command, Request};
|
||||
use raas_types::raas::register::BotTypes;
|
||||
use raas_types::raas::resp::Response;
|
||||
use raas_types::raas::{recv_raas_msg, send_raas_msg};
|
||||
use thiserror::Error;
|
||||
use tokio::io::DuplexStream;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("IO Error")]
|
||||
Io(#[from] std::io::Error),
|
||||
#[error("Protobuf Encode Error")]
|
||||
ProtobufEncode(#[from] prost::EncodeError),
|
||||
#[error("Protobuf Decode Error")]
|
||||
ProtobufDecode(#[from] prost::DecodeError),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RobotConnector {
|
||||
pub bot_id: u32,
|
||||
pub bot_tags: Vec<BotTypes>,
|
||||
|
||||
tcp_stream: TcpStream,
|
||||
msg_stream: DuplexStream,
|
||||
|
||||
is_running: bool,
|
||||
}
|
||||
|
||||
impl RobotConnector {
|
||||
pub fn new(
|
||||
bot_id: u32,
|
||||
bot_tags: Vec<BotTypes>,
|
||||
tcp_stream: TcpStream,
|
||||
msg_stream: DuplexStream,
|
||||
) -> Self {
|
||||
Self {
|
||||
bot_id,
|
||||
bot_tags,
|
||||
tcp_stream,
|
||||
msg_stream,
|
||||
is_running: false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_request(&mut self) -> Result<Request, Error> {
|
||||
let request = recv_raas_msg(&mut self.msg_stream).await?;
|
||||
|
||||
let request = Request::decode(request.msg.as_slice())?;
|
||||
|
||||
Ok(request)
|
||||
}
|
||||
|
||||
async fn send_command(&mut self, request: Request) -> Result<(), Error> {
|
||||
let cmd = Command {
|
||||
id: self.bot_id,
|
||||
request: Some(request),
|
||||
};
|
||||
|
||||
let mut msg = Vec::new();
|
||||
|
||||
cmd.encode(&mut msg).unwrap();
|
||||
send_raas_msg(&mut self.tcp_stream, msg).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_response(&mut self) -> Result<Response, Error> {
|
||||
let recv = recv_raas_msg(&mut self.tcp_stream).await?;
|
||||
let resp = Response::decode(recv.msg.as_slice()).unwrap();
|
||||
|
||||
info!("Worker (bot_id={}) got resp: {:?}", self.bot_id, resp);
|
||||
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
async fn respond_to_request(&mut self, resp: Response) -> Result<(), Error> {
|
||||
let mut msg = Vec::new();
|
||||
resp.encode(&mut msg)?;
|
||||
|
||||
send_raas_msg(&mut self.msg_stream, msg).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn is_running(&self) -> bool {
|
||||
self.is_running
|
||||
}
|
||||
|
||||
pub async fn worker(&mut self) {
|
||||
self.is_running = true;
|
||||
|
||||
loop {
|
||||
info!("Worker (bot_id={}) is waiting for requests", self.bot_id);
|
||||
let request = match self.wait_for_request().await {
|
||||
Ok(r) => r,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Worker (bot_id={}) failed to get request: {:?}",
|
||||
self.bot_id, err
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match self.send_command(request).await {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Worker (bot_id={}) failed to send command to bot: {:?}",
|
||||
self.bot_id, err
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let resp = match self.get_response().await {
|
||||
Ok(r) => r,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Worker (bot_id={}) failed to get response from bot: {:?}",
|
||||
self.bot_id, err
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match self.respond_to_request(resp).await {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Worker (bot_id={}) failed to send response: {:?}",
|
||||
self.bot_id, err
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
error!("Exiting Worker (bot_id={})", self.bot_id);
|
||||
self.is_running = false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,176 @@
|
|||
use crate::robot::robot_connector::{Error, RobotConnector};
|
||||
use crate::robot::ROBOT_MESSAGE_QUEUE_SIZE;
|
||||
use log::{error, info};
|
||||
use prost::Message;
|
||||
use raas_types::raas;
|
||||
use raas_types::raas::bot::roll::Roll;
|
||||
use raas_types::raas::cmd::request::Cmd;
|
||||
use raas_types::raas::cmd::{Command, Request};
|
||||
use raas_types::raas::register::{Register, RegisterResponse};
|
||||
use raas_types::raas::resp::response::Resp;
|
||||
use raas_types::raas::resp::Response;
|
||||
use raas_types::raas::{recv_raas_msg, send_raas_msg, RaasMessage};
|
||||
use std::collections::HashMap;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::select;
|
||||
use tokio::time::timeout;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RaaSCmd {
|
||||
Roll(u32),
|
||||
Img(Vec<u8>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ConnectorHandle {
|
||||
pub id: u32,
|
||||
pub stream: DuplexStream,
|
||||
pub tags: Vec<raas::register::BotTypes>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ConnectorManager {
|
||||
next_id: u32,
|
||||
connectors: HashMap<u32, ConnectorHandle>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RobotManager {
|
||||
addr: String,
|
||||
connector_manager: std::sync::Arc<tokio::sync::Mutex<ConnectorManager>>,
|
||||
command_stream: std::sync::Arc<tokio::sync::Mutex<DuplexStream>>,
|
||||
}
|
||||
|
||||
impl RobotManager {
|
||||
pub fn new(addr: &str, stream: DuplexStream) -> Self {
|
||||
let connector_manager = ConnectorManager {
|
||||
next_id: 0,
|
||||
connectors: Default::default(),
|
||||
};
|
||||
|
||||
Self {
|
||||
addr: addr.to_string(),
|
||||
connector_manager: std::sync::Arc::new(tokio::sync::Mutex::new(connector_manager)),
|
||||
command_stream: std::sync::Arc::new(tokio::sync::Mutex::new(stream)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_register(&self, tcp_listener: &mut TcpListener) -> Result<(), std::io::Error> {
|
||||
info!("Listening for connection at {}", self.addr);
|
||||
let (mut socket, addr) = tcp_listener.accept().await?;
|
||||
info!("Got connection from {}", addr);
|
||||
|
||||
let register_msg = recv_raas_msg(&mut socket).await?;
|
||||
|
||||
let register = Register::decode(&*register_msg.msg).unwrap();
|
||||
|
||||
let mut connector_manager = self.connector_manager.lock().await;
|
||||
|
||||
let id = connector_manager.next_id;
|
||||
connector_manager.next_id += 1;
|
||||
|
||||
let register_resp = RegisterResponse {
|
||||
name: register.name.to_string(),
|
||||
r#type: register.bot_type,
|
||||
id,
|
||||
};
|
||||
|
||||
let mut msg = Vec::new();
|
||||
|
||||
register_resp.encode(&mut msg).unwrap();
|
||||
|
||||
send_raas_msg(&mut socket, msg).await?;
|
||||
|
||||
let (connector_duplex_manager, connector_duplex_connector) =
|
||||
duplex(ROBOT_MESSAGE_QUEUE_SIZE);
|
||||
|
||||
let mut connector = RobotConnector::new(
|
||||
id,
|
||||
vec![register.bot_type()],
|
||||
socket,
|
||||
connector_duplex_connector,
|
||||
);
|
||||
|
||||
let connector_handler = ConnectorHandle {
|
||||
id,
|
||||
stream: connector_duplex_manager,
|
||||
tags: connector.bot_tags.clone(),
|
||||
};
|
||||
|
||||
connector_manager.connectors.insert(id, connector_handler);
|
||||
|
||||
tokio::spawn(async move { connector.worker().await });
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_command(&self) -> Result<RaasMessage, Error> {
|
||||
let mut command_stream = self.command_stream.lock().await;
|
||||
|
||||
let resp = recv_raas_msg(command_stream.deref_mut()).await?;
|
||||
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
async fn send_resp(&self, msg: RaasMessage) -> Result<(), Error> {
|
||||
let mut command_stream = self.command_stream.lock().await;
|
||||
|
||||
command_stream.write_all(&*msg.into_bytes()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handle_command(&self) -> Result<(), Error> {
|
||||
info!("Waiting for command...");
|
||||
|
||||
let msg = self.recv_command().await?;
|
||||
|
||||
let request = Request::decode(&*msg.msg)?;
|
||||
|
||||
let dest_type = match request.cmd.clone().unwrap() {
|
||||
Cmd::RollCmd(_) => raas::register::BotTypes::Roll,
|
||||
};
|
||||
|
||||
let mut connector_manager = self.connector_manager.lock().await;
|
||||
let connector = connector_manager
|
||||
.connectors
|
||||
.iter_mut()
|
||||
.find(|(_, handler)| handler.tags.contains(&dest_type));
|
||||
|
||||
if let Some((id, connector)) = connector {
|
||||
info!("Sending message to connector with id={}", id);
|
||||
let mut msg = Vec::new();
|
||||
request.encode(&mut msg)?;
|
||||
send_raas_msg(&mut connector.stream, msg).await?;
|
||||
|
||||
let resp = recv_raas_msg(&mut connector.stream).await?;
|
||||
info!("Got response from id={}", id);
|
||||
self.send_resp(resp).await?;
|
||||
} else {
|
||||
error!("No connectors available to handle request");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start_manager(self) -> Result<(), std::io::Error> {
|
||||
let handle_command_manager = self.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
handle_command_manager.handle_command().await.unwrap()
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut tcp_listener = TcpListener::bind(&self.addr).await.unwrap();
|
||||
loop {
|
||||
self.handle_register(&mut tcp_listener).await.unwrap()
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue