diff --git a/src/config.rs b/src/config.rs index 172d4b8..43e86e3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -16,7 +16,6 @@ use std::path::Path; pub const GIT_VERSION: &str = git_version!(); const MASTERSERVER: &str = "ipbl.paulbsd.com"; -const ZMQSUBSCRIPTION: &str = "ipbl"; const WSSUBSCRIPTION: &str = "ipbl"; const CONFIG_RETRY: u64 = 10; @@ -288,7 +287,6 @@ pub struct Config { #[serde(skip_serializing)] pub trustnets: Vec, pub ws: HashMap, - pub zmq: HashMap, pub api: String, } @@ -348,17 +346,6 @@ impl Config { endpoint: format!("wss://{}/wsrr", MASTERSERVER.to_string()), subscription: WSSUBSCRIPTION.to_string(), })]), - zmq: HashMap::from([("pubsub".to_string(),ZMQCfg{ - t: "pubsub".to_string(), - hostname: MASTERSERVER.to_string(), - port: 9999, - subscription: ZMQSUBSCRIPTION.to_string(), - }),("reqrep".to_string(),ZMQCfg { - t: "reqrep".to_string(), - hostname: MASTERSERVER.to_string(), - port: 9998, - subscription: String::new(), - })]), api: String::from("127.0.0.1:8060") } } @@ -367,7 +354,6 @@ impl Config { self.get_global_config(&ctx).await?; self.get_trustnets(&ctx).await?; self.get_sets(&ctx).await?; - self.get_zmq_config(&ctx).await?; self.get_ws_config(&ctx).await?; Ok(()) } @@ -445,30 +431,6 @@ impl Config { Ok(()) } - async fn get_zmq_config(&mut self, ctx: &Context) -> Result<(), ReqError> { - let resp: Result = ctx - .client - .get(format!("{server}/config/zmq", server = ctx.flags.server)) - .send() - .await; - let req = match resp { - Ok(re) => re, - Err(err) => return Err(err), - }; - let data: HashMap = match req.json::>().await { - Ok(res) => { - let mut out: HashMap = HashMap::new(); - res.into_iter().map(|x| x).for_each(|x| { - out.insert(x.t.to_string(), x); - }); - out - } - Err(err) => return Err(err), - }; - self.zmq = data; - Ok(()) - } - async fn get_ws_config(&mut self, ctx: &Context) -> Result<(), ReqError> { let resp: Result = ctx .client @@ -509,7 +471,7 @@ impl Config { pub fn bootstrap_event(&self) -> IpEvent { IpEvent { msgtype: String::from("bootstrap"), - mode: String::from("socket"), + mode: String::from("ws"), hostname: gethostname(true), ipdata: IpData { ip: "".to_string(), @@ -537,15 +499,6 @@ pub struct Set { pub tryfail: i64, } -#[derive(Debug, Deserialize, Serialize, Clone)] -pub struct ZMQCfg { - #[serde(rename = "type")] - pub t: String, - pub hostname: String, - pub port: i64, - pub subscription: String, -} - #[derive(Debug, Deserialize, Serialize, Clone)] pub struct WebSocketCfg { #[serde(rename = "type")] @@ -592,7 +545,7 @@ mod test { for _i in 0..10 { ctx.update_blocklist(&mut IpEvent { msgtype: String::from("add"), - mode: String::from("zmq"), + mode: String::from("ws"), hostname: String::from("localhost"), ipdata: IpData { ip: "1.1.1.1".to_string(), @@ -607,7 +560,7 @@ mod test { for _ in 0..10 { ctx.update_blocklist(&mut IpEvent { msgtype: String::from("add"), - mode: String::from("zmq"), + mode: String::from("ws"), hostname: String::from("localhost"), ipdata: IpData { ip: "1.1.1.2".to_string(), @@ -621,7 +574,7 @@ mod test { ctx.update_blocklist(&mut IpEvent { msgtype: String::from("add"), - mode: String::from("zmq"), + mode: String::from("ws"), hostname: String::from("localhost"), ipdata: IpData { ip: "1.1.1.3".to_string(), @@ -634,7 +587,7 @@ mod test { ctx.update_blocklist(&mut IpEvent { msgtype: String::from("add"), - mode: String::from("zmq"), + mode: String::from("ws"), hostname: String::from("localhost"), ipdata: IpData { ip: "1.1.1.4".to_string(), @@ -647,7 +600,7 @@ mod test { ctx.update_blocklist(&mut IpEvent { msgtype: String::from("add"), - mode: String::from("zmq"), + mode: String::from("ws"), hostname: String::from("localhost"), ipdata: IpData { ip: "1.1.1.4".to_string(), diff --git a/src/ipblc.rs b/src/ipblc.rs index cf84282..ecb5310 100644 --- a/src/ipblc.rs +++ b/src/ipblc.rs @@ -38,7 +38,7 @@ pub async fn run() { // initialize sockets let (ipeventtx, mut ipeventrx): (Sender, Receiver) = channel(WS_CHAN_SIZE); let ctxws = Arc::clone(&ctxarc); - let wssocketrr = websocketinit(&ctxws, &ipeventtx).await; + let mut wssocketrr = websocketinit(&ctxws, &ipeventtx).await; let mut blrx = watchfiles(&ctxarc).await; @@ -49,7 +49,7 @@ pub async fn run() { let bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone(); - send_to_ipbl_websocket(&wssocketrr, &bootstrap_event, &mut ret).await; + send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event, &mut ret).await; loop { ret = Vec::new(); @@ -57,8 +57,8 @@ pub async fn run() { let ctxclone = Arc::clone(&ctxarc); tokio::select! { - val = ipeventrx.recv() => { - let received_ip = val.unwrap(); + ipevent = ipeventrx.recv() => { + let received_ip = ipevent.unwrap(); let mut ctx = ctxclone.write().await; @@ -70,7 +70,7 @@ pub async fn run() { hostname: fqdn.clone(), ipdata: ip_to_send, }; - send_to_ipbl_websocket(&wssocketrr, &ipe, &mut ret).await; + send_to_ipbl_websocket(&mut wssocketrr, &ipe, &mut ret).await; } continue } @@ -84,15 +84,12 @@ pub async fn run() { println!("sending {} to api and ws", ipevent.ipdata.ip); let event = IpEvent{ msgtype: String::from("add"), - mode: String::from("socket"), + mode: String::from("ws"), hostname: fqdn.clone(), ipdata: ipevent.ipdata, }; - println!("blabla1"); send_to_ipbl_api(&ctx.client, &ctx.hostname, &ctx.flags.server, &event, &mut ret).await; - println!("blabla2"); - send_to_ipbl_websocket(&wssocketrr, &event, &mut ret).await; - println!("blabla3"); + send_to_ipbl_websocket(&mut wssocketrr, &event, &mut ret).await; } } } diff --git a/src/websocket.rs b/src/websocket.rs index 4c2467c..6db2602 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -1,6 +1,6 @@ use crate::config::{Context, WebSocketCfg}; use crate::ip::IpEvent; -use crate::utils::{gethostname, sleep_ms}; +use crate::utils::gethostname; use serde_json::json; use std::net::TcpStream; @@ -25,8 +25,8 @@ pub async fn websocketconnect<'a>( pub async fn websocketinit( ctxarc: &Arc>, ipeventtx: &Sender, -) -> Arc>>> { - let (wssocketps, wssocketreqrep); +) -> WebSocket> { + let (wssocketps, wssocketrr); { let ctx = ctxarc.read().await; wssocketps = Arc::new(RwLock::new( @@ -34,15 +34,13 @@ pub async fn websocketinit( .await .unwrap(), )); - wssocketreqrep = Arc::new(RwLock::new( - websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone()) - .await - .unwrap(), - )); + wssocketrr = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone()) + .await + .unwrap(); } wslistenpubsub(wssocketps, ipeventtx.clone()).await; - return wssocketreqrep; + return wssocketrr; } async fn wslistenpubsub( @@ -51,99 +49,45 @@ async fn wslistenpubsub( ) { tokio::spawn(async move { loop { - let msgs: Option; { let mut ws = websocket.write().await; - msgs = match ws.read_message() { - Ok(s) => { - println!("msg: {}", s); - None - } - Err(e) => { - println!("error: {e:?}"); - ws.close(None).unwrap(); - return; - } - }; - match msgs { - Some(ss) => { - let tosend: IpEvent = serde_json::from_str(ss.as_str()).unwrap(); + match ws.read_message() { + Ok(msg) => { + let tosend: IpEvent = + serde_json::from_str(msg.to_string().as_str()).unwrap(); if tosend.ipdata.hostname != gethostname(true) || tosend.msgtype == "init".to_string() { txpubsub.send(tosend).await.unwrap(); } } - None => {} - }; - } - } - }); -} - -async fn wslistenreqrep( - websocket: Arc>>>, - txpubsub: Sender, -) { - tokio::spawn(async move { - loop { - let msgs: Option; - { - let mut ws = websocket.write().await; - msgs = match ws.read_message() { - Ok(s) => { - println!("msg: {}", s); - None - } Err(e) => { println!("error: {e:?}"); ws.close(None).unwrap(); return; } }; - match msgs { - Some(ss) => { - let tosend: IpEvent = serde_json::from_str(ss.as_str()).unwrap(); - if tosend.ipdata.hostname != gethostname(true) - || tosend.msgtype == "init".to_string() - { - txpubsub.send(tosend).await.unwrap(); - } - } - None => {} - }; } } }); } pub async fn send_to_ipbl_websocket( - wssocket: &Arc>>>, + ws: &mut WebSocket>, ip: &IpEvent, _ret: &mut Vec, ) { let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); - println!("testarc"); - let wsclone = Arc::clone(wssocket); - println!("testarc2"); - let mut ws = wsclone.write().await; - - println!("write"); match ws.write_message(Message::Text(msg)) { - Ok(o) => { - println!("{o:?}") - } + Ok(_) => {} Err(e) => { println!("err 1: {e:?}") } }; - println!("read"); match ws.read_message() { - Ok(o) => { - println!("{o}") - } + Ok(_) => {} Err(e) => { println!("err 2: {e:?}") }