Add pack in ping handling

main
Joey Hines 2024-06-23 14:15:05 -06:00
parent 530b51d581
commit 5b1251d089
Signed by: joeyahines
GPG Key ID: 995E531F7A569DDB
1 changed files with 44 additions and 5 deletions

View File

@ -1,10 +1,11 @@
use crate::robot::{Error, ROBOT_TIMEOUT}; use crate::robot::{Error, ROBOT_TIMEOUT};
use log::{error, info}; use log::{debug, error, info};
use prost::Message; use prost::Message;
use raas_types::raas::cmd::{Command, Request}; use raas_types::raas::cmd::{Command, Request};
use raas_types::raas::register::BotTypes; use raas_types::raas::register::BotTypes;
use raas_types::raas::resp::Response; use raas_types::raas::resp::Response;
use raas_types::raas::{recv_raas_msg, send_raas_msg}; use raas_types::raas::{recv_raas_msg, send_raas_msg};
use std::time::Duration;
use tokio::io::DuplexStream; use tokio::io::DuplexStream;
use tokio::net::TcpStream; use tokio::net::TcpStream;
@ -63,7 +64,7 @@ impl RobotConnector {
let resp = Response::decode(recv.msg.as_slice()).unwrap(); let resp = Response::decode(recv.msg.as_slice()).unwrap();
info!("Worker (bot_id={}) got resp", self.bot_id); debug!("Worker (bot_id={}) got resp", self.bot_id);
Ok(resp) Ok(resp)
} }
@ -81,16 +82,54 @@ impl RobotConnector {
self.is_running = true; self.is_running = true;
loop { loop {
if let Err(res) = self.handle_request().await { let timeout_res =
error!("Closing robot connection: {}", res); tokio::time::timeout(Duration::from_secs(120), self.handle_request()).await;
if let Ok(res) = timeout_res {
if let Err(res) = res {
error!(
"Worker (bot_id = {}) Closing robot connection: {}",
self.bot_id, res
);
break; break;
} }
} else if let Err(err) = self.handle_ping().await {
error!(
"Worker (bot_id = {}) Got error response from ping: {}",
self.bot_id, err
)
}
} }
error!("Exiting Worker (bot_id={})", self.bot_id); error!("Exiting Worker (bot_id={})", self.bot_id);
self.is_running = false; self.is_running = false;
} }
async fn handle_ping(&mut self) -> Result<(), Error> {
self.send_command(Request {
timestamp: chrono::Utc::now().timestamp() as u64,
cmd: Some(raas_types::raas::cmd::request::Cmd::RollCmd(
raas_types::raas::bot::roll::RollCmd {
cmd: Some(raas_types::raas::bot::roll::roll_cmd::Cmd::Ping(
raas_types::raas::ping::Ping {
data: format!("id: {}", self.bot_id),
},
)),
},
)),
})
.await?;
let resp = tokio::time::timeout(Duration::from_secs(5), self.get_response()).await??;
debug!(
"Worker (bot_id = {}) Got ping resp: {:?}",
self.bot_id, resp
);
Ok(())
}
async fn handle_request(&mut self) -> Result<(), Error> { async fn handle_request(&mut self) -> Result<(), Error> {
info!("Worker (bot_id={}) is waiting for requests", self.bot_id); info!("Worker (bot_id={}) is waiting for requests", self.bot_id);
let request = match self.wait_for_request().await { let request = match self.wait_for_request().await {