From c5854ee5aefa73fec1d6d145f63b97badb82710b Mon Sep 17 00:00:00 2001 From: Joey Hines Date: Sun, 26 May 2024 21:11:16 -0600 Subject: [PATCH] Protobuf client --- .gitignore | 1 + Cargo.lock | 6 +- Cargo.toml | 4 +- src/client.rs | 222 ++++++++++++++++++++++++++++++++++++++++++++++++ src/config.rs | 61 +++++++++++++ src/main.rs | 38 +++++---- src/roll_bot.rs | 21 ++--- 7 files changed, 321 insertions(+), 32 deletions(-) create mode 100644 src/client.rs create mode 100644 src/config.rs diff --git a/.gitignore b/.gitignore index d81f12e..9acf73d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target /.idea +config.toml \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index fc988c2..f33d003 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -657,9 +657,9 @@ dependencies = [ [[package]] name = "raas_types" -version = "0.0.1" +version = "0.0.2" source = "registry+https://git.jojodev.com/joeyahines/_cargo-index.git" -checksum = "35ffd19aaa3beca193b58d06c98565f308083453854fda62bd1d11cbc699d807" +checksum = "6104c0c441473bc9c0817f9958a1c1e0db9ea7c72a7fee826b84a1d9d1baea62" dependencies = [ "bytes", "prost", @@ -702,8 +702,10 @@ dependencies = [ "config", "env_logger", "log", + "prost", "raas_types", "rppal", + "serde", "structopt", "thiserror", ] diff --git a/Cargo.toml b/Cargo.toml index a38a18e..9b86e5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,9 +9,11 @@ edition = "2021" structopt = "0.3.26" config = "0.14.0" rppal = "0.18.0" -raas_types = { version = "0.0.1", registry = "jojo-dev"} +raas_types = { version = "0.0.2", registry = "jojo-dev"} log = "0.4.21" env_logger = "0.11.3" thiserror = "1.0.61" +serde = { version = "1.0.203", features = ["derive"] } +prost = "0.12.6" diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..655dfe1 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,222 @@ +use crate::config::RollBotConfig; +use crate::roll_bot::RollBot; +use log::{error, info, warn}; +use prost::Message; +use raas_types::raas::bot::roll::{roll_cmd, Roll, RollCmd, RollImage, RollResponse}; +use raas_types::raas::cmd::command::Cmd; +use raas_types::raas::cmd::Command; +use raas_types::raas::ping::{Ping, Pong}; +use raas_types::raas::register::{BotTypes, Register, RegisterResponse}; +use raas_types::raas::resp::response::Resp; +use raas_types::raas::resp::Response; +use raas_types::raas::RaasMessage; +use std::io::{Read, Write}; +use std::net::TcpStream; +use std::time::{SystemTime, UNIX_EPOCH}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Roll Bot Error")] + RollBot(#[from] crate::roll_bot::Error), + #[error("Network Error")] + IO(#[from] std::io::Error), + #[error("Protobuf Encode Error")] + ProtoBufEncode(#[from] prost::EncodeError), + #[error("Protobuf Decode Error")] + ProtoBufDecode(#[from] prost::DecodeError), + #[error("Invalid command ID")] + InvalidCommandID, + #[error("Invalid command")] + InvalidCommand, +} + +pub struct Client { + config: RollBotConfig, + roll_bot: RollBot, + + id: Option, +} + +impl Client { + pub fn new(config: RollBotConfig, roll_bot: RollBot) -> Self { + Self { + config, + roll_bot, + id: None, + } + } + + fn try_connect(&self) -> Result { + for attempt in 1..10 { + info!("Attempting to connect to {}", self.config.server_addr); + let socket = TcpStream::connect(self.config.server_addr.clone()); + + if let Ok(socket) = socket { + return Ok(socket); + } + + let wait = attempt * 10; + warn!("Connection failed, waiting {}s...", wait); + std::thread::sleep(std::time::Duration::from_secs(wait)) + } + + Err(std::io::Error::last_os_error().into()) + } + + fn register(&mut self, socket: &mut TcpStream) -> Result<(), Error> { + let register_msg = Register { + name: "Roll Bot".to_string(), + bot_type: BotTypes::Roll.into(), + }; + + let mut message = Vec::new(); + register_msg.encode(&mut message)?; + + Self::send_packet(socket, message)?; + + let resp = Self::receive_packet(socket)?; + + let register_msg = RegisterResponse::decode(&*resp.msg)?; + + info!( + "Registered '{}' as id {}", + register_msg.name, register_msg.id + ); + + self.id = Some(register_msg.id); + + Ok(()) + } + + fn receive_packet(socket: &mut TcpStream) -> Result { + let mut message_len_bytes = [0u8; 4]; + socket.read_exact(&mut message_len_bytes)?; + + let len = u32::from_be_bytes(message_len_bytes); + + let mut message = vec![0u8; len as usize]; + + socket.read_exact(&mut message)?; + + Ok(RaasMessage { len, msg: message }) + } + + fn send_packet(socket: &mut TcpStream, data: Vec) -> Result<(), Error> { + let msg = RaasMessage::new(data); + + socket.write_all(&msg.into_bytes())?; + Ok(()) + } + + fn get_next_command(&mut self, socket: &mut TcpStream) -> Result { + let msg = Self::receive_packet(socket)?; + + let command = Command::decode(&*msg.msg)?; + + info!( + "Received command for id={} timestamp={}", + command.id, command.timestamp + ); + + if command.id != self.id.unwrap() { + warn!("Command for a different ID was received, dropping"); + return Err(Error::InvalidCommandID); + } + + match command.cmd.unwrap() { + Cmd::RollCmd(cmd) => Ok(cmd), + #[allow(unreachable_patterns)] + _ => { + warn!("Invalid command id received"); + Err(Error::InvalidCommand) + } + } + } + + fn handle_ping(&mut self, ping: Ping) -> Result { + info!("Received ping w/ data {:?}", ping.data); + + let pong = Pong { + ping: Some(ping), + is_ok: true, + }; + + let resp = RollResponse { + response: Some(raas_types::raas::bot::roll::roll_response::Response::Pong( + pong, + )), + }; + + Ok(resp) + } + + fn handle_roll(&mut self, roll: Roll) -> Result { + info!("Doing roll..."); + self.roll_bot.roll(roll.rotations)?; + + let resp = RollResponse { + response: Some( + raas_types::raas::bot::roll::roll_response::Response::RollImage(RollImage { + img: vec![], + }), + ), + }; + + Ok(resp) + } + + fn handle_command(&mut self, socket: &mut TcpStream, roll_cmd: RollCmd) -> Result<(), Error> { + let roll_resp = match roll_cmd.cmd.unwrap() { + roll_cmd::Cmd::Ping(ping) => self.handle_ping(ping), + roll_cmd::Cmd::Roll(roll) => self.handle_roll(roll), + }?; + + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let resp = Response { + id: self.id.unwrap(), + timestamp, + resp: Some(Resp::RollResp(roll_resp)), + }; + + let mut message = vec![0u8; 1024]; + + resp.encode(&mut message)?; + + Self::send_packet(socket, message)?; + + Ok(()) + } + + pub fn run_client(&mut self) -> Result<(), Error> { + loop { + let mut socket = self.try_connect()?; + + self.register(&mut socket)?; + + loop { + let roll_cmd_res = self.get_next_command(&mut socket); + + if let Ok(roll_cmd) = roll_cmd_res { + self.handle_command(&mut socket, roll_cmd)?; + } else if let Err(roll_cmd_err) = roll_cmd_res { + match roll_cmd_err { + Error::IO(io_error) => { + warn!("Connection issue, reattempting connection: {}", io_error); + self.id = None; + break; + } + Error::InvalidCommandID | Error::InvalidCommand => { + continue; + } + _ => return Err(roll_cmd_err), + } + } + } + } + } +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..2dda73e --- /dev/null +++ b/src/config.rs @@ -0,0 +1,61 @@ +use config::Config; +use rppal::pwm::Channel; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use structopt::StructOpt; + +#[derive(StructOpt)] +pub struct Args { + pub config_file: PathBuf, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ChannelConfig { + Pwm0, + Pwm1, +} + +impl From for Channel { + fn from(value: ChannelConfig) -> Self { + match value { + ChannelConfig::Pwm0 => Channel::Pwm0, + ChannelConfig::Pwm1 => Channel::Pwm1, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PWMStep { + pub pulse_us: u64, + pub delay_ms: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PWMConfig { + pub pwm_channel: ChannelConfig, + pub period_ms: u64, + pub pulse_min_us: u64, + pub pulse_max_us: u64, + pub steps: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RollBotConfig { + pub server_addr: String, + + pub pwm_config: PWMConfig, +} + +impl RollBotConfig { + pub fn new(config: PathBuf) -> Self { + let daemon_config = Config::builder() + .add_source(config::File::new( + config.to_str().unwrap(), + config::FileFormat::Toml, + )) + .build() + .unwrap(); + + daemon_config.try_deserialize().unwrap() + } +} diff --git a/src/main.rs b/src/main.rs index 61da131..700b7ea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,27 +1,30 @@ +mod client; +mod config; mod roll_bot; use structopt::StructOpt; +use log::{error, info}; use std::error::Error; use std::time::Duration; -use log::info; +use crate::client::Client; +use crate::config::Args; use crate::roll_bot::RollBot; -use rppal::pwm::{Channel, Polarity, Pwm}; - -const PERIOD_MS: u64 = 20; -const PULSE_MIN_US: u64 = 1000; -const PULSE_NEUTRAL_US: u64 = 1500; -const PULSE_MAX_US: u64 = 2000; +use rppal::pwm::{Polarity, Pwm}; fn main() -> Result<(), Box> { env_logger::init(); + let args = Args::from_args(); + + let config = config::RollBotConfig::new(args.config_file); + info!("Initializing PWM"); let pwm = Pwm::with_period( - Channel::Pwm0, - Duration::from_millis(PERIOD_MS), - Duration::from_micros(PULSE_MAX_US), + config.pwm_config.pwm_channel.clone().into(), + Duration::from_millis(config.pwm_config.period_ms), + Duration::from_micros(config.pwm_config.pulse_max_us), Polarity::Normal, true, )?; @@ -29,15 +32,20 @@ fn main() -> Result<(), Box> { info!("Initializing Roll Bot"); let roll_bot = RollBot { pwm, - move_time: Duration::from_millis(1000), + steps: config.pwm_config.steps.clone(), }; - roll_bot.set_pwm_output(Duration::from_micros(PULSE_MIN_US))?; + roll_bot.set_pwm_output(Duration::from_micros(config.pwm_config.pulse_min_us))?; - loop { - roll_bot.roll(5)?; - std::thread::sleep(Duration::from_millis(1000)); + let mut client = Client::new(config, roll_bot); + + let res = client.run_client(); + + if let Err(err) = res { + error!("Error in client: {}", err); } + info!("Roll Bot shutting down..."); + Ok(()) } diff --git a/src/roll_bot.rs b/src/roll_bot.rs index 4f471fd..9bd223b 100644 --- a/src/roll_bot.rs +++ b/src/roll_bot.rs @@ -1,7 +1,7 @@ -use crate::{PULSE_MAX_US, PULSE_MIN_US, PULSE_NEUTRAL_US}; +use crate::config::PWMStep; use log::info; use rppal::pwm::Pwm; -use std::thread; +use std::thread::sleep; use std::time::Duration; use thiserror::Error; @@ -13,14 +13,13 @@ pub enum Error { pub struct RollBot { pub pwm: Pwm, - pub move_time: Duration, + pub steps: Vec, } impl RollBot { pub fn set_pwm_output(&self, pulse: Duration) -> Result<(), Error> { info!("Going to {:?}", pulse); - self.pwm - .set_pulse_width(pulse)?; + self.pwm.set_pulse_width(pulse)?; Ok(()) } @@ -28,15 +27,9 @@ impl RollBot { pub fn roll(&self, tumble_times: u32) -> Result<(), Error> { for tumble in 1..=tumble_times { info!("Doing Tumble {}", tumble); - - for step in (PULSE_MIN_US..=PULSE_MAX_US).step_by(1000) { - self.set_pwm_output(Duration::from_micros(step))?; - thread::sleep(self.move_time); - } - - for step in (PULSE_MIN_US..=PULSE_MAX_US).rev().step_by(1000) { - self.set_pwm_output(Duration::from_micros(step))?; - thread::sleep(self.move_time); + for step in &self.steps { + self.set_pwm_output(Duration::from_micros(step.pulse_us))?; + sleep(Duration::from_millis(step.delay_ms)); } }