diff --git a/src/websocket.rs b/src/websocket.rs index 5a097de..470b783 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -10,6 +10,58 @@ use tokio::sync::RwLock; use tungstenite::stream::*; use tungstenite::*; +pub async fn websocketinit( + ctxarc: &Arc>, + ipeventtx: Arc>>, +) -> WebSocket> { + let (mut wssocketrr, bootstrap_event); + { + let ctx = ctxarc.read().await; + bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone(); + + wssocketrr = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone()) + .await + .unwrap(); + send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await; + } + + wslistenpubsub(ctxarc, ipeventtx).await; + return wssocketrr; +} + +async fn wslistenpubsub(ctxarc: &Arc>, txpubsub: Arc>>) { + let ctx = ctxarc.read().await; + let cfg = ctx.cfg.ws.get("pubsub").unwrap().clone(); + let hostname = ctx.hostname.clone(); + let mut websocket = Arc::new(RwLock::new( + websocketconnect(&cfg, hostname.clone()).await.unwrap(), + )); + tokio::spawn(async move { + loop { + let mut ws = websocket.write().await; + match ws.read_message() { + Ok(msg) => { + let tosend: IpEvent = serde_json::from_str(msg.to_string().as_str()).unwrap(); + if tosend.ipdata.hostname != gethostname(true) + || tosend.msgtype == "init".to_string() + { + let txps = txpubsub.write().await; + txps.send(tosend).await.unwrap(); + } + } + Err(e) => { + println!("error in pubsub: {e:?}"); + ws.close(None).unwrap(); + drop(ws); + websocket = Arc::new(RwLock::new( + websocketconnect(&cfg, hostname.clone()).await.unwrap(), + )); + } + }; + } + }); +} + pub async fn websocketconnect<'a>( wscfg: &WebSocketCfg, hostname: String, @@ -22,59 +74,6 @@ pub async fn websocketconnect<'a>( Ok(socket) } -pub async fn websocketinit( - ctxarc: &Arc>, - ipeventtx: Arc>>, -) -> WebSocket> { - let (wssocketps, mut wssocketrr, bootstrap_event); - { - let ctx = ctxarc.read().await; - bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone(); - wssocketps = Arc::new(RwLock::new( - websocketconnect(&ctx.cfg.ws.get("pubsub").unwrap(), ctx.hostname.clone()) - .await - .unwrap(), - )); - wssocketrr = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone()) - .await - .unwrap(); - send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await; - } - - wslistenpubsub(wssocketps, ipeventtx).await; - return wssocketrr; -} - -async fn wslistenpubsub( - websocket: Arc>>>, - txpubsub: Arc>>, -) { - tokio::spawn(async move { - loop { - { - let mut ws = websocket.write().await; - match ws.read_message() { - Ok(msg) => { - let tosend: IpEvent = - serde_json::from_str(msg.to_string().as_str()).unwrap(); - if tosend.ipdata.hostname != gethostname(true) - || tosend.msgtype == "init".to_string() - { - let txps = txpubsub.write().await; - txps.send(tosend).await.unwrap(); - } - } - Err(e) => { - println!("error: {e:?}"); - ws.close(None).unwrap(); - return; - } - }; - } - } - }); -} - pub async fn send_to_ipbl_websocket( ws: &mut WebSocket>, ip: &IpEvent, @@ -84,7 +83,8 @@ pub async fn send_to_ipbl_websocket( match ws.write_message(Message::Text(msg)) { Ok(_) => {} Err(e) => { - println!("err 1: {e:?}"); + println!("err send write: {e:?}"); + ws.close(None).unwrap(); return false; } }; @@ -92,7 +92,8 @@ pub async fn send_to_ipbl_websocket( match ws.read_message() { Ok(_) => {} Err(e) => { - println!("err 2: {e:?}"); + println!("err send read: {e:?}"); + ws.close(None).unwrap(); return false; } };