From f29ccd3f0bfadd92e5f07e0dbdd0a14ade0f5ef0 Mon Sep 17 00:00:00 2001 From: Paul Lecuq Date: Tue, 26 Dec 2023 10:42:39 +0100 Subject: [PATCH] updated ipevent with Option --- src/config.rs | 65 ++++++++++++++++++++++------------------------- src/ip.rs | 10 +++++++- src/ipblc.rs | 38 ++++++++++++++++++++++----- src/webservice.rs | 2 +- src/websocket.rs | 11 ++++---- 5 files changed, 77 insertions(+), 49 deletions(-) diff --git a/src/config.rs b/src/config.rs index b400703..886b90e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,7 +17,7 @@ use std::path::Path; pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]); const MASTERSERVER: &str = "ipbl.paulbsd.com"; const WSSUBSCRIPTION: &str = "ipbl"; -const CONFIG_RETRY: u64 = 2; +const CONFIG_RETRY_INTERVAL: u64 = 2; const WEB_CLIENT_TIMEOUT: i64 = 5; #[derive(Debug)] @@ -128,9 +128,9 @@ impl Context { break; } Err(err) => { - println!("error loading config: {err}, retrying in {CONFIG_RETRY}s"); + println!("error loading config: {err}, retrying in {CONFIG_RETRY_INTERVAL}s"); last_in_err = true; - sleep_s(CONFIG_RETRY).await; + sleep_s(CONFIG_RETRY_INTERVAL).await; } }; } @@ -169,22 +169,23 @@ impl Context { } pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option { - match self.cfg.sets.get(&ipevent.ipdata.src) { + let ipdata = &ipevent.ipdata.clone().unwrap(); + match self.cfg.sets.get(&ipdata.src) { Some(set) => { - let starttime = DateTime::parse_from_rfc3339(ipevent.ipdata.date.as_str()) + let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str()) .unwrap() .with_timezone(&chrono::Local); let blocktime = set.blocktime; if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname { - let block = self - .blocklist - .entry(ipevent.ipdata.ip.to_string()) - .or_insert(BlockIpData { - ipdata: ipevent.ipdata.clone(), - tryfail: 0, - starttime, - blocktime, - }); + let block = + self.blocklist + .entry(ipdata.ip.to_string()) + .or_insert(BlockIpData { + ipdata: ipdata.clone(), + tryfail: 0, + starttime, + blocktime, + }); block.tryfail += 1; block.blocktime = blocktime; if block.tryfail >= set.tryfail { @@ -192,9 +193,9 @@ impl Context { } } else { self.blocklist - .entry(ipevent.ipdata.ip.to_string()) + .entry(ipdata.ip.to_string()) .or_insert(BlockIpData { - ipdata: ipevent.ipdata.clone(), + ipdata: ipdata.clone(), tryfail: set.tryfail, starttime, blocktime, @@ -427,13 +428,7 @@ impl Config { msgtype: String::from("bootstrap"), mode: String::from("ws"), hostname: gethostname(true), - ipdata: IpData { - t: 4, - ip: "".to_string(), - src: "".to_string(), - date: "".to_string(), - hostname: "".to_string(), - }, + ipdata: None, } } } @@ -521,13 +516,13 @@ mod test { msgtype: String::from("add"), mode: String::from("ws"), hostname: String::from("localhost"), - ipdata: IpData { + ipdata: Some(IpData { t: 4, ip: "1.1.1.1".to_string(), hostname: "test1".to_string(), date: now.to_rfc3339().to_string(), src: "ssh".to_string(), - }, + }), }) .await; } @@ -537,13 +532,13 @@ mod test { msgtype: String::from("add"), mode: String::from("ws"), hostname: String::from("localhost"), - ipdata: IpData { + ipdata: Some(IpData { t: 4, ip: "1.1.1.2".to_string(), hostname: "test2".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), - }, + }), }) .await; } @@ -552,13 +547,13 @@ mod test { msgtype: String::from("add"), mode: String::from("ws"), hostname: String::from("localhost"), - ipdata: IpData { + ipdata: Some(IpData { t: 4, ip: "1.1.1.3".to_string(), hostname: "testgood".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), - }, + }), }) .await; @@ -566,13 +561,13 @@ mod test { msgtype: String::from("add"), mode: String::from("ws"), hostname: String::from("localhost"), - ipdata: IpData { + ipdata: Some(IpData { t: 4, ip: "1.1.1.4".to_string(), hostname: "testgood".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), - }, + }), }) .await; @@ -580,26 +575,26 @@ mod test { msgtype: String::from("add"), mode: String::from("ws"), hostname: String::from("localhost"), - ipdata: IpData { + ipdata: Some(IpData { t: 4, ip: "1.1.1.4".to_string(), hostname: "testgood".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), - }, + }), }) .await; ctx.update_blocklist(&mut IpEvent { msgtype: String::from("add"), mode: String::from("ws"), hostname: String::from("localhost"), - ipdata: IpData { + ipdata: Some(IpData { t: 6, ip: "2a00:1450:4007:805::2003".to_string(), hostname: "testgood".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), - }, + }), }) .await; diff --git a/src/ip.rs b/src/ip.rs index 5d5afa3..510fbe7 100644 --- a/src/ip.rs +++ b/src/ip.rs @@ -22,7 +22,7 @@ pub struct IpEvent { pub msgtype: String, pub mode: String, pub hostname: String, - pub ipdata: IpData, + pub ipdata: Option, } #[macro_export] @@ -35,6 +35,14 @@ macro_rules! ipevent { ipdata: $ipdata, } }; + ($msgtype:expr,$mode:expr,$hostname:expr) => { + IpEvent { + msgtype: String::from($msgtype), + mode: String::from($mode), + hostname: $hostname, + ipdata: None, + } + }; } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/src/ipblc.rs b/src/ipblc.rs index fd83e91..0be1cd1 100644 --- a/src/ipblc.rs +++ b/src/ipblc.rs @@ -65,6 +65,13 @@ pub async fn run() { let ctxclone = Arc::clone(&ctxarc); + let ipe = ipevent!("ping", "ws", gethostname(true)); + if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { + wssocketrr.close(None).unwrap(); + wssocketrr = websocketreqrep(&ctxwsrr).await; + continue; + } + tokio::select! { ipevent = ipeventrx.recv() => { let received_ip = ipevent.unwrap(); @@ -76,7 +83,7 @@ pub async fn run() { if received_ip.msgtype == "bootstrap".to_string() { for ip_to_send in toblock { - let ipe = ipevent!("init","ws",gethostname(true),ip_to_send); + let ipe = ipevent!("init","ws",gethostname(true),Some(ip_to_send)); if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { wssocketrr = websocketreqrep(&ctxwsrr).await; break; @@ -93,11 +100,10 @@ pub async fn run() { // send ip list to api and ws sockets if let Some(ipevent) = filtered_ipevent { if received_ip.msgtype != "init" { - println!("sending {} to api and ws", ipevent.ipdata.ip); + println!("sending {} to api and ws", ipevent.ipdata.clone().unwrap().ip); let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata); send_to_ipbl_api(&server.clone(), &ipe).await; - let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await; - if !status { + if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { wssocketrr = websocketreqrep(&ctxwsrr).await; continue; } @@ -128,8 +134,26 @@ async fn handle_cfg_reload( ) { let now_cfg_reload = Local::now().trunc_subsecs(0); if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) { - let inotify = inoarc.read().await; - match ctxclone.write().await.load(&inotify).await { + let inotify; + loop { + inotify = match inoarc.try_read() { + Ok(o) => o, + Err(e) => { + println!("{e}"); + sleep_s(1).await; + continue; + } + }; + break; + } + let mut ctxtest = match ctxclone.try_write() { + Ok(o) => o, + Err(e) => { + println!("{e}"); + return; + } + }; + match ctxtest.load(&inotify).await { Ok(_) => { *last_cfg_reload = Local::now().trunc_subsecs(0); } @@ -243,7 +267,7 @@ async fn compare_files_changes( } } for ip in iplist { - let ipe = ipevent!("add", "file", gethostname(true), ip); + let ipe = ipevent!("add", "file", gethostname(true), Some(ip)); let ipetx = ipeventtx.read().await; ipetx.send(ipe).await.unwrap(); } diff --git a/src/webservice.rs b/src/webservice.rs index a4d74f6..d80eee5 100644 --- a/src/webservice.rs +++ b/src/webservice.rs @@ -11,7 +11,7 @@ pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) { let mut try_req = 0; let client = httpclient(); loop { - match push_ip(&client, &server, &ip.ipdata).await { + match push_ip(&client, &server, &ip.ipdata.clone().unwrap()).await { Ok(_) => { break; } diff --git a/src/websocket.rs b/src/websocket.rs index 55e82f5..c5a733f 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -14,13 +14,13 @@ use tungstenite::*; pub async fn websocketreqrep( ctxarc: &Arc>, ) -> WebSocket> { - let (mut wssocketrr, bootstrap_event, cfg); + let (mut wssocketrr, bootstrap_event, wscfg); { let ctx = ctxarc.read().await; bootstrap_event = ctx.cfg.bootstrap_event().clone(); - cfg = ctx.cfg.ws.get("reqrep").unwrap().clone(); + wscfg = ctx.cfg.ws.get("reqrep").unwrap().clone(); } - wssocketrr = websocketconnect(&cfg, &gethostname(true)).await.unwrap(); + wssocketrr = websocketconnect(&wscfg, &gethostname(true)).await.unwrap(); send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await; return wssocketrr; @@ -45,11 +45,12 @@ pub async fn websocketpubsub( Ok(msg) => { let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) { Ok(o) => o, - Err(_e) => { + Err(e) => { + println!("error in pubsub: {e:?}"); continue; } }; - if tosend.ipdata.hostname != gethostname(true) + if tosend.ipdata.clone().unwrap().hostname != gethostname(true) || tosend.msgtype == "init".to_string() { let txps = txpubsub.read().await;