diff --git a/src/robot/robot_connector.rs b/src/robot/robot_connector.rs index fd3b603..e6f7b77 100644 --- a/src/robot/robot_connector.rs +++ b/src/robot/robot_connector.rs @@ -1,10 +1,11 @@ use crate::robot::{Error, ROBOT_TIMEOUT}; -use log::{error, info}; +use log::{debug, error, info}; use prost::Message; use raas_types::raas::cmd::{Command, Request}; use raas_types::raas::register::BotTypes; use raas_types::raas::resp::Response; use raas_types::raas::{recv_raas_msg, send_raas_msg}; +use std::time::Duration; use tokio::io::DuplexStream; use tokio::net::TcpStream; @@ -63,7 +64,7 @@ impl RobotConnector { let resp = Response::decode(recv.msg.as_slice()).unwrap(); - info!("Worker (bot_id={}) got resp", self.bot_id); + debug!("Worker (bot_id={}) got resp", self.bot_id); Ok(resp) } @@ -81,9 +82,22 @@ impl RobotConnector { self.is_running = true; loop { - if let Err(res) = self.handle_request().await { - error!("Closing robot connection: {}", res); - break; + let timeout_res = + tokio::time::timeout(Duration::from_secs(120), self.handle_request()).await; + + if let Ok(res) = timeout_res { + if let Err(res) = res { + error!( + "Worker (bot_id = {}) Closing robot connection: {}", + self.bot_id, res + ); + break; + } + } else if let Err(err) = self.handle_ping().await { + error!( + "Worker (bot_id = {}) Got error response from ping: {}", + self.bot_id, err + ) } } @@ -91,6 +105,31 @@ impl RobotConnector { self.is_running = false; } + async fn handle_ping(&mut self) -> Result<(), Error> { + self.send_command(Request { + timestamp: chrono::Utc::now().timestamp() as u64, + cmd: Some(raas_types::raas::cmd::request::Cmd::RollCmd( + raas_types::raas::bot::roll::RollCmd { + cmd: Some(raas_types::raas::bot::roll::roll_cmd::Cmd::Ping( + raas_types::raas::ping::Ping { + data: format!("id: {}", self.bot_id), + }, + )), + }, + )), + }) + .await?; + + let resp = tokio::time::timeout(Duration::from_secs(5), self.get_response()).await??; + + debug!( + "Worker (bot_id = {}) Got ping resp: {:?}", + self.bot_id, resp + ); + + Ok(()) + } + async fn handle_request(&mut self) -> Result<(), Error> { info!("Worker (bot_id={}) is waiting for requests", self.bot_id); let request = match self.wait_for_request().await {