use crate::config::{Context, WebSocketCfg}; use crate::ip::IpEvent; use crate::utils::gethostname; use serde_json::json; use std::net::TcpStream; use std::sync::Arc; use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; use tungstenite::stream::*; use tungstenite::*; pub async fn websocketconnect<'a>( wscfg: &WebSocketCfg, hostname: String, ) -> Result>, Error> { let (mut socket, _response) = connect(&wscfg.endpoint).expect("Can't connect"); let msg = json!({ "hostname": hostname }); socket .write_message(Message::Text(msg.to_string())) .unwrap(); Ok(socket) } pub async fn websocketinit( ctxarc: &Arc>, ipeventtx: Arc>>, ) -> WebSocket> { let (wssocketps, mut wssocketrr, bootstrap_event); { let ctx = ctxarc.read().await; bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone(); wssocketps = Arc::new(RwLock::new( websocketconnect(&ctx.cfg.ws.get("pubsub").unwrap(), ctx.hostname.clone()) .await .unwrap(), )); wssocketrr = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone()) .await .unwrap(); send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await; } wslistenpubsub(wssocketps, ipeventtx).await; return wssocketrr; } async fn wslistenpubsub( websocket: Arc>>>, txpubsub: Arc>>, ) { tokio::spawn(async move { loop { { let mut ws = websocket.write().await; match ws.read_message() { Ok(msg) => { let tosend: IpEvent = serde_json::from_str(msg.to_string().as_str()).unwrap(); if tosend.ipdata.hostname != gethostname(true) || tosend.msgtype == "init".to_string() { let txps = txpubsub.write().await; txps.send(tosend).await.unwrap(); } } Err(e) => { println!("error: {e:?}"); ws.close(None).unwrap(); return; } }; } } }); } pub async fn send_to_ipbl_websocket( ws: &mut WebSocket>, ip: &IpEvent, ) -> bool { let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); match ws.write_message(Message::Text(msg)) { Ok(_) => {} Err(e) => { println!("err 1: {e:?}"); return false; } }; match ws.read_message() { Ok(_) => {} Err(e) => { println!("err 2: {e:?}"); return false; } }; true }