Support timeouts while waiting for messages from robots
parent
cf64eafcb5
commit
530b51d581
|
@ -26,6 +26,21 @@ dependencies = [
|
|||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "android-tzdata"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
|
||||
|
||||
[[package]]
|
||||
name = "android_system_properties"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ansi_term"
|
||||
version = "0.12.1"
|
||||
|
@ -285,6 +300,12 @@ dependencies = [
|
|||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bumpalo"
|
||||
version = "3.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.6.0"
|
||||
|
@ -303,6 +324,20 @@ version = "1.0.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "chrono"
|
||||
version = "0.4.38"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
|
||||
dependencies = [
|
||||
"android-tzdata",
|
||||
"iana-time-zone",
|
||||
"js-sys",
|
||||
"num-traits",
|
||||
"wasm-bindgen",
|
||||
"windows-targets 0.52.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "2.34.0"
|
||||
|
@ -373,6 +408,12 @@ dependencies = [
|
|||
"unicode-segmentation",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation-sys"
|
||||
version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f"
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.2.12"
|
||||
|
@ -766,6 +807,29 @@ dependencies = [
|
|||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iana-time-zone"
|
||||
version = "0.1.60"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141"
|
||||
dependencies = [
|
||||
"android_system_properties",
|
||||
"core-foundation-sys",
|
||||
"iana-time-zone-haiku",
|
||||
"js-sys",
|
||||
"wasm-bindgen",
|
||||
"windows-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iana-time-zone-haiku"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
|
||||
dependencies = [
|
||||
"cc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "1.9.3"
|
||||
|
@ -807,6 +871,15 @@ version = "1.0.11"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.69"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d"
|
||||
dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "json5"
|
||||
version = "0.4.1"
|
||||
|
@ -908,6 +981,15 @@ dependencies = [
|
|||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_cpus"
|
||||
version = "1.16.0"
|
||||
|
@ -1158,6 +1240,7 @@ name = "raas"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"axum 0.7.5",
|
||||
"chrono",
|
||||
"config",
|
||||
"env_logger",
|
||||
"log",
|
||||
|
@ -1777,6 +1860,60 @@ version = "0.11.0+wasi-snapshot-preview1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen"
|
||||
version = "0.2.92"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"wasm-bindgen-macro",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-backend"
|
||||
version = "0.2.92"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da"
|
||||
dependencies = [
|
||||
"bumpalo",
|
||||
"log",
|
||||
"once_cell",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.66",
|
||||
"wasm-bindgen-shared",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-macro"
|
||||
version = "0.2.92"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726"
|
||||
dependencies = [
|
||||
"quote",
|
||||
"wasm-bindgen-macro-support",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-macro-support"
|
||||
version = "0.2.92"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.66",
|
||||
"wasm-bindgen-backend",
|
||||
"wasm-bindgen-shared",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-shared"
|
||||
version = "0.2.92"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96"
|
||||
|
||||
[[package]]
|
||||
name = "winapi"
|
||||
version = "0.3.9"
|
||||
|
@ -1799,6 +1936,15 @@ version = "0.4.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||
|
||||
[[package]]
|
||||
name = "windows-core"
|
||||
version = "0.52.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
|
||||
dependencies = [
|
||||
"windows-targets 0.52.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windows-sys"
|
||||
version = "0.48.0"
|
||||
|
|
|
@ -17,6 +17,7 @@ tonic = "0.11.0"
|
|||
config = "0.14.0"
|
||||
serde = "1.0.203"
|
||||
structopt = "0.3.26"
|
||||
chrono = "0.4.38"
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.11.0"
|
|
@ -8,6 +8,7 @@ pub mod robot_connector;
|
|||
pub mod robot_manager;
|
||||
|
||||
pub const ROBOT_MESSAGE_QUEUE_SIZE: usize = 10;
|
||||
const ROBOT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectorHandle {
|
||||
|
@ -33,4 +34,6 @@ pub enum Error {
|
|||
ConnectionClosed(u32),
|
||||
#[error("No robots to handle requests")]
|
||||
NoRobotsToHandleRequest,
|
||||
#[error("Timed out waiting for response")]
|
||||
Timeout(#[from] tokio::time::error::Elapsed),
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::robot::Error;
|
||||
use crate::robot::{Error, ROBOT_TIMEOUT};
|
||||
use log::{error, info};
|
||||
use prost::Message;
|
||||
use raas_types::raas::cmd::{Command, Request};
|
||||
|
@ -58,7 +58,9 @@ impl RobotConnector {
|
|||
}
|
||||
|
||||
async fn get_response(&mut self) -> Result<Response, Error> {
|
||||
let recv = recv_raas_msg(&mut self.tcp_stream).await?;
|
||||
let recv_timeout = tokio::time::timeout(ROBOT_TIMEOUT, recv_raas_msg(&mut self.tcp_stream));
|
||||
let recv = recv_timeout.await??;
|
||||
|
||||
let resp = Response::decode(recv.msg.as_slice()).unwrap();
|
||||
|
||||
info!("Worker (bot_id={}) got resp", self.bot_id);
|
||||
|
@ -79,53 +81,60 @@ impl RobotConnector {
|
|||
self.is_running = true;
|
||||
|
||||
loop {
|
||||
info!("Worker (bot_id={}) is waiting for requests", self.bot_id);
|
||||
let request = match self.wait_for_request().await {
|
||||
Ok(r) => r,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Worker (bot_id={}) failed to get request: {:?}",
|
||||
self.bot_id, err
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match self.send_command(request).await {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Worker (bot_id={}) failed to send command to bot: {:?}",
|
||||
self.bot_id, err
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let resp = match self.get_response().await {
|
||||
Ok(r) => r,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Worker (bot_id={}) failed to get response from bot: {:?}",
|
||||
self.bot_id, err
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match self.respond_to_request(resp).await {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Worker (bot_id={}) failed to send response: {:?}",
|
||||
self.bot_id, err
|
||||
);
|
||||
break;
|
||||
}
|
||||
if let Err(res) = self.handle_request().await {
|
||||
error!("Closing robot connection: {}", res);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
error!("Exiting Worker (bot_id={})", self.bot_id);
|
||||
self.is_running = false;
|
||||
}
|
||||
|
||||
async fn handle_request(&mut self) -> Result<(), Error> {
|
||||
info!("Worker (bot_id={}) is waiting for requests", self.bot_id);
|
||||
let request = match self.wait_for_request().await {
|
||||
Ok(r) => r,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Worker (bot_id={}) failed to get request: {:?}",
|
||||
self.bot_id, err
|
||||
);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
match self.send_command(request).await {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Worker (bot_id={}) failed to send command to bot: {:?}",
|
||||
self.bot_id, err
|
||||
);
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
|
||||
let resp = match self.get_response().await {
|
||||
Ok(r) => r,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Worker (bot_id={}) failed to get response from bot: {:?}",
|
||||
self.bot_id, err
|
||||
);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
match self.respond_to_request(resp).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Worker (bot_id={}) failed to send response: {:?}",
|
||||
self.bot_id, err
|
||||
);
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::robot::{ConnectorManager, Error};
|
||||
use crate::robot::{ConnectorManager, Error, ROBOT_TIMEOUT};
|
||||
use log::{error, info};
|
||||
use prost::Message;
|
||||
use raas_types::raas;
|
||||
|
@ -92,7 +92,9 @@ impl RobotManager {
|
|||
return Err(Error::ConnectionClosed(*id));
|
||||
}
|
||||
|
||||
let ret = recv_raas_msg(&mut connector.stream).await;
|
||||
let timeout = tokio::time::timeout(ROBOT_TIMEOUT, recv_raas_msg(&mut connector.stream));
|
||||
|
||||
let ret = timeout.await?;
|
||||
|
||||
let resp = match ret {
|
||||
Ok(r) => r,
|
||||
|
@ -157,6 +159,11 @@ impl RobotManager {
|
|||
raas::error::ErrorType::NoRobotsToHandleRequest,
|
||||
"No robots to handle request".to_string(),
|
||||
),
|
||||
Error::Timeout(_) => (
|
||||
0,
|
||||
raas::error::ErrorType::RobotError,
|
||||
"Timed out waiting for response from Robot".to_string(),
|
||||
),
|
||||
};
|
||||
|
||||
self.send_error(id, error, error_msg.as_str()).await?;
|
||||
|
|
Loading…
Reference in New Issue