From ebd969f6f83b47c10eceabfbf41d21fd9bf4b46c Mon Sep 17 00:00:00 2001 From: Paul Lecuq Date: Sun, 11 Sep 2022 23:35:44 +0200 Subject: [PATCH] handle fetch of already active ip addresses on other nodes --- src/config/mod.rs | 11 ++++++++++- src/ip.rs | 4 ++++ src/ipblc/inc.rs | 21 +++++++++++++++++++-- src/ipblc/mod.rs | 6 +++--- 4 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 39c50f6..2c4cd89 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -177,13 +177,17 @@ impl Context { return a.ipdata.clone(); } else { let now = Local::now().trunc_subsecs(0); + let mut tryfail = 0; + if ipdata.mode == "zmq".to_string() { + tryfail = 100; + } match self.cfg.sets.get(&ipdata.src) { Some(set) => { self.blocklist.insert( ipdata.ip.to_string(), BlockIpData { ipdata: ipdata.clone(), - tryfail: 0, + tryfail, starttime: now, blocktime: set.blocktime, }, @@ -454,6 +458,7 @@ mod test { hostname: "test1".to_string(), date: now.to_rfc3339().to_string(), src: "ssh".to_string(), + mode: "file".to_string(), }) .await; } @@ -464,6 +469,7 @@ mod test { hostname: "test2".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), + mode: "file".to_string(), }) .await; } @@ -473,6 +479,7 @@ mod test { hostname: "testgood".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), + mode: "file".to_string(), }) .await; @@ -481,6 +488,7 @@ mod test { hostname: "testgood".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), + mode: "file".to_string(), }) .await; @@ -489,6 +497,7 @@ mod test { hostname: "testgood".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), + mode: "file".to_string(), }) .await; diff --git a/src/ip.rs b/src/ip.rs index a18f2dc..7283810 100644 --- a/src/ip.rs +++ b/src/ip.rs @@ -24,6 +24,7 @@ pub struct IpData { pub src: String, pub date: String, pub hostname: String, + pub mode: String, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -77,6 +78,7 @@ pub async fn push_ip(ctx: &Context, ip: &IpData, ret: &mut Vec) -> Resul src: ip.src.to_string(), date: ip.date.to_string(), hostname: ip.hostname.to_string(), + mode: "file".to_string(), }); let resp = ctx @@ -113,6 +115,7 @@ pub async fn _push_ip_bulk( src: ip.src.to_string(), date: ip.date.to_string(), hostname: ip.hostname.to_string(), + mode: "file".to_string(), }) } @@ -195,6 +198,7 @@ pub fn filter( src: src.to_owned(), date: s_date.to_rfc3339().to_owned(), hostname: hostname.to_owned(), + mode: "file".to_owned(), }); ips += 1; }; diff --git a/src/ipblc/inc.rs b/src/ipblc/inc.rs index 20a7685..39867be 100644 --- a/src/ipblc/inc.rs +++ b/src/ipblc/inc.rs @@ -45,17 +45,34 @@ pub async fn process(ctx: &Arc>) { compare_files_changes(&ctxarc, &mut blrx, &ipdatatx).await; }); + let mut ip = IpData { + ip: "".to_string(), + src: "".to_string(), + date: "".to_string(), + hostname: "".to_string(), + mode: "init".to_string(), + }; + send_to_ipbl_zmq(&reqsocket, &mut ip).await; + loop { let mut ret: Vec = Vec::new(); let begin: DateTime = Local::now().trunc_subsecs(0); // wait for logs parse and zmq channel receive - let ip = ipdatarx.recv().await.unwrap(); + let mut ip = ipdatarx.recv().await.unwrap(); // lock the context mutex let ctxarc = Arc::clone(&ctx); let mut ctx = ctxarc.lock().await; + if ip.mode == "init" { + for i in &mut ctx.get_blocklist_toblock().await { + i.mode = "zmq".to_string(); + send_to_ipbl_zmq(&reqsocket, i).await; + } + continue; + } + // refresh context blocklist ctx.update_blocklist(&ip).await; ctx.gc_blocklist().await; @@ -63,7 +80,7 @@ pub async fn process(ctx: &Arc>) { // send ip list to ws and zmq sockets if ip.hostname == ctx.hostname { send_to_ipbl_ws(&ctx, &ip, &mut ret).await; - send_to_ipbl_zmq(&reqsocket, &ip).await; + send_to_ipbl_zmq(&reqsocket, &mut ip).await; } // apply firewall blocking diff --git a/src/ipblc/mod.rs b/src/ipblc/mod.rs index 3c3a2d7..2745c3b 100644 --- a/src/ipblc/mod.rs +++ b/src/ipblc/mod.rs @@ -23,15 +23,15 @@ impl std::fmt::Debug for FileEvent { } } -async fn send_to_ipbl_zmq(socket: &zmq::Socket, ip: &IpData) { +async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &mut IpData) { let msg = format!("{value}", value = serde_json::to_string(&ip).unwrap()); - match socket.send(&msg, 0) { + match reqsocket.send(&msg, 0) { Ok(_) => {} Err(e) => { println!("{e:?}") } }; - match socket.recv_string(0) { + match reqsocket.recv_string(0) { Ok(o) => match o { Ok(_) => {} Err(ee) => {