diff --git a/src/ipblc.rs b/src/ipblc.rs index ecb5310..f1e0941 100644 --- a/src/ipblc.rs +++ b/src/ipblc.rs @@ -20,10 +20,8 @@ const BL_CHAN_SIZE: usize = 32; const WS_CHAN_SIZE: usize = 64; pub async fn run() { - let fqdn = gethostname(true); let globalctx = Context::new().await; let ctxarc = Arc::new(RwLock::new(globalctx)); - let mut ret: Vec = Vec::new(); let mut fwlen: usize = 0; let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); @@ -37,22 +35,22 @@ pub async fn run() { // initialize sockets let (ipeventtx, mut ipeventrx): (Sender, Receiver) = channel(WS_CHAN_SIZE); + let ipeventtxarc = Arc::new(RwLock::new(ipeventtx)); + let ctxws = Arc::clone(&ctxarc); - let mut wssocketrr = websocketinit(&ctxws, &ipeventtx).await; + let mut ipeventws = Arc::clone(&ipeventtxarc); + let mut wssocketrr = websocketinit(&ctxws, ipeventws).await; let mut blrx = watchfiles(&ctxarc).await; let ctxclone = Arc::clone(&ctxarc); + let ipeventclone = Arc::clone(&ipeventtxarc); tokio::spawn(async move { - compare_files_changes(&ctxclone, &mut blrx, &ipeventtx).await; + compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await; }); - let bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone(); - - send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event, &mut ret).await; - loop { - ret = Vec::new(); + let mut ret: Vec = Vec::new(); let ctxclone = Arc::clone(&ctxarc); @@ -67,10 +65,10 @@ pub async fn run() { let ipe = IpEvent{ msgtype: String::from("init"), mode: String::from("ws"), - hostname: fqdn.clone(), + hostname: gethostname(true), ipdata: ip_to_send, }; - send_to_ipbl_websocket(&mut wssocketrr, &ipe, &mut ret).await; + send_to_ipbl_websocket(&mut wssocketrr, &ipe).await; } continue } @@ -82,14 +80,19 @@ pub async fn run() { if let Some(ipevent) = filtered_ipevent { if received_ip.msgtype != "init" { println!("sending {} to api and ws", ipevent.ipdata.ip); - let event = IpEvent{ + let ipe = IpEvent{ msgtype: String::from("add"), mode: String::from("ws"), - hostname: fqdn.clone(), + hostname: gethostname(true), ipdata: ipevent.ipdata, }; - send_to_ipbl_api(&ctx.client, &ctx.hostname, &ctx.flags.server, &event, &mut ret).await; - send_to_ipbl_websocket(&mut wssocketrr, &event, &mut ret).await; + send_to_ipbl_api(&ctx.client, &ctx.flags.server, &ipe).await; + let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await; + if !status { + ipeventws = Arc::clone(&ipeventtxarc); + wssocketrr = websocketinit(&ctxws, ipeventws).await; + } + } } } @@ -170,7 +173,7 @@ async fn get_last_file_size(w: &mut HashMap, path: &str) -> (u64, b async fn compare_files_changes( ctxarc: &Arc>, inrx: &mut Receiver, - ipeventtx: &Sender, + ipeventtx: &Arc>>, ) { let mut tnets; loop { @@ -236,7 +239,8 @@ async fn compare_files_changes( mode: String::from("file"), ipdata: ip, }; - ipeventtx.send(ipevent).await.unwrap(); + let ipetx = ipeventtx.write().await; + ipetx.send(ipevent).await.unwrap(); } } None => {} diff --git a/src/webservice.rs b/src/webservice.rs index d312a83..a8a6b2f 100644 --- a/src/webservice.rs +++ b/src/webservice.rs @@ -5,17 +5,10 @@ use crate::utils::sleep_s; use reqwest::Client; use reqwest::Error as ReqError; -pub async fn send_to_ipbl_api( - client: &Client, - hostname: &str, - server: &str, - ip: &IpEvent, - ret: &mut Vec, -) { - ret.push(format!("host: {hostname}", hostname = hostname)); +pub async fn send_to_ipbl_api(client: &Client, server: &str, ip: &IpEvent) { let mut i = 1; loop { - match push_ip(&client, &server, &ip.ipdata, ret).await { + match push_ip(&client, &server, &ip.ipdata).await { Ok(_) => { break; } @@ -31,13 +24,7 @@ pub async fn send_to_ipbl_api( } } -async fn push_ip( - client: &Client, - server: &str, - ip: &IpData, - ret: &mut Vec, -) -> Result<(), ReqError> { - let result: String; +async fn push_ip(client: &Client, server: &str, ip: &IpData) -> Result<(), ReqError> { let mut data: Vec = vec![]; data.push(IpData { @@ -47,22 +34,12 @@ async fn push_ip( hostname: ip.hostname.to_string(), }); - let resp = client + client .post(format!("{server}/ips")) .json(&data) .send() .await?; - ret.push(format!("status: {status}", status = resp.status())); - let res = resp.text().await.unwrap(); - - if res.trim().len() > 0 { - result = res.trim().to_string(); - } else { - result = "".to_string(); - } - - ret.push(format!("response: {result}")); Ok(()) } diff --git a/src/websocket.rs b/src/websocket.rs index 6db2602..5a097de 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -24,11 +24,12 @@ pub async fn websocketconnect<'a>( pub async fn websocketinit( ctxarc: &Arc>, - ipeventtx: &Sender, + ipeventtx: Arc>>, ) -> WebSocket> { - let (wssocketps, wssocketrr); + let (wssocketps, mut wssocketrr, bootstrap_event); { let ctx = ctxarc.read().await; + bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone(); wssocketps = Arc::new(RwLock::new( websocketconnect(&ctx.cfg.ws.get("pubsub").unwrap(), ctx.hostname.clone()) .await @@ -37,15 +38,16 @@ pub async fn websocketinit( wssocketrr = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone()) .await .unwrap(); + send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await; } - wslistenpubsub(wssocketps, ipeventtx.clone()).await; + wslistenpubsub(wssocketps, ipeventtx).await; return wssocketrr; } async fn wslistenpubsub( websocket: Arc>>>, - txpubsub: Sender, + txpubsub: Arc>>, ) { tokio::spawn(async move { loop { @@ -58,7 +60,8 @@ async fn wslistenpubsub( if tosend.ipdata.hostname != gethostname(true) || tosend.msgtype == "init".to_string() { - txpubsub.send(tosend).await.unwrap(); + let txps = txpubsub.write().await; + txps.send(tosend).await.unwrap(); } } Err(e) => { @@ -75,21 +78,23 @@ async fn wslistenpubsub( pub async fn send_to_ipbl_websocket( ws: &mut WebSocket>, ip: &IpEvent, - _ret: &mut Vec, -) { +) -> bool { let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); match ws.write_message(Message::Text(msg)) { Ok(_) => {} Err(e) => { - println!("err 1: {e:?}") + println!("err 1: {e:?}"); + return false; } }; match ws.read_message() { Ok(_) => {} Err(e) => { - println!("err 2: {e:?}") + println!("err 2: {e:?}"); + return false; } }; + true }