diff --git a/src/ipblc.rs b/src/ipblc.rs index 469e5bf..ad63fd3 100644 --- a/src/ipblc.rs +++ b/src/ipblc.rs @@ -4,7 +4,7 @@ use crate::ip::{filter, IpData, IpEvent}; use crate::monitoring::apiserver; use crate::utils::{gethostname, read_lines, sleep_ms}; use crate::webservice::send_to_ipbl_api; -use crate::websocket::{send_to_ipbl_websocket, websocketinit}; +use crate::websocket::{send_to_ipbl_websocket, websocketpubsub, websocketreqrep}; use chrono::prelude::*; use chrono::prelude::{DateTime, Local}; @@ -37,9 +37,12 @@ pub async fn run() { let (ipeventtx, mut ipeventrx): (Sender, Receiver) = channel(WS_CHAN_SIZE); let ipeventtxarc = Arc::new(RwLock::new(ipeventtx)); - let ctxws = Arc::clone(&ctxarc); - let mut ipeventws = Arc::clone(&ipeventtxarc); - let mut wssocketrr = websocketinit(&ctxws, ipeventws).await; + let ctxwsrr = Arc::clone(&ctxarc); + let ipeventws = Arc::clone(&ipeventtxarc); + let mut wssocketrr = websocketreqrep(&ctxwsrr).await; + + let ctxwsps = Arc::clone(&ctxarc); + websocketpubsub(&ctxwsps, ipeventws).await; let mut blrx = watchfiles(&ctxarc).await; @@ -70,8 +73,8 @@ pub async fn run() { }; let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await; if !status { - ipeventws = Arc::clone(&ipeventtxarc); - wssocketrr = websocketinit(&ctxws, ipeventws).await; + wssocketrr = websocketreqrep(&ctxwsrr).await; + continue; } } continue @@ -93,8 +96,8 @@ pub async fn run() { send_to_ipbl_api(&ctx.client, &ctx.flags.server, &ipe).await; let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await; if !status { - ipeventws = Arc::clone(&ipeventtxarc); - wssocketrr = websocketinit(&ctxws, ipeventws).await; + wssocketrr = websocketreqrep(&ctxwsrr).await; + continue; } } diff --git a/src/websocket.rs b/src/websocket.rs index 470b783..e39694c 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -3,6 +3,7 @@ use crate::ip::IpEvent; use crate::utils::gethostname; use serde_json::json; +use std::io::{self, Write}; use std::net::TcpStream; use std::sync::Arc; use tokio::sync::mpsc::Sender; @@ -10,9 +11,8 @@ use tokio::sync::RwLock; use tungstenite::stream::*; use tungstenite::*; -pub async fn websocketinit( +pub async fn websocketreqrep( ctxarc: &Arc>, - ipeventtx: Arc>>, ) -> WebSocket> { let (mut wssocketrr, bootstrap_event); { @@ -25,14 +25,17 @@ pub async fn websocketinit( send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await; } - wslistenpubsub(ctxarc, ipeventtx).await; return wssocketrr; } -async fn wslistenpubsub(ctxarc: &Arc>, txpubsub: Arc>>) { +pub async fn websocketpubsub( + ctxarc: &Arc>, + txpubsub: Arc>>, +) { let ctx = ctxarc.read().await; let cfg = ctx.cfg.ws.get("pubsub").unwrap().clone(); let hostname = ctx.hostname.clone(); + drop(ctx); let mut websocket = Arc::new(RwLock::new( websocketconnect(&cfg, hostname.clone()).await.unwrap(), )); @@ -66,7 +69,10 @@ pub async fn websocketconnect<'a>( wscfg: &WebSocketCfg, hostname: String, ) -> Result>, Error> { + print!("connecting to {} ...", &wscfg.endpoint); + io::stdout().flush().unwrap(); let (mut socket, _response) = connect(&wscfg.endpoint).expect("Can't connect"); + print!(" connected!\n"); let msg = json!({ "hostname": hostname }); socket .write_message(Message::Text(msg.to_string()))