From e8c717221992af154df9714282a0529acb9847de Mon Sep 17 00:00:00 2001 From: Paul Lecuq Date: Sun, 15 Jan 2023 16:05:34 +0100 Subject: [PATCH] changed IpData->IpEvent{IpData} struct --- src/ip.rs | 6 ++++++ src/ipblc.rs | 54 ++++++++++++++++++++++++++++++++------------------- src/zmqcom.rs | 16 ++++++++------- 3 files changed, 49 insertions(+), 27 deletions(-) diff --git a/src/ip.rs b/src/ip.rs index dbadbdb..b11de03 100644 --- a/src/ip.rs +++ b/src/ip.rs @@ -18,6 +18,12 @@ lazy_static! { static ref R_DATE: Regex = Regex::new(include_str!("regexps/date.txt")).unwrap(); } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct IpEvent { + pub msgtype: String, + pub ipdata: IpData, +} + #[derive(Clone, Debug, Serialize, Deserialize, Eq)] pub struct IpData { pub ip: String, diff --git a/src/ipblc.rs b/src/ipblc.rs index 1404904..016c80b 100644 --- a/src/ipblc.rs +++ b/src/ipblc.rs @@ -1,7 +1,7 @@ use crate::api::apiserver; use crate::config::{Context, GIT_VERSION}; use crate::fw::{fwblock, fwinit}; -use crate::ip::{filter, IpData}; +use crate::ip::{filter, IpData, IpEvent}; use crate::utils::read_lines; use crate::ws::send_to_ipbl_ws; use crate::zmqcom::{send_to_ipbl_zmq, zmqinit}; @@ -39,31 +39,34 @@ pub async fn run() { let mut fwlen: usize = 0; // initialize zeromq sockets - let (ipdatatx, mut ipdatarx): (Sender, Receiver) = channel(ZMQ_CHAN_SIZE); - let zmqreqsocket = zmqinit(&ctxclone, &ipdatatx).await; + let (ipeventtx, mut ipeventrx): (Sender, Receiver) = channel(ZMQ_CHAN_SIZE); + let zmqreqsocket = zmqinit(&ctxclone, &ipeventtx).await; let mut blrx = watchfiles(&ctxclone).await; let ctxclone = Arc::clone(&ctxarc); tokio::spawn(async move { - compare_files_changes(&ctxclone, &mut blrx, &ipdatatx).await; + compare_files_changes(&ctxclone, &mut blrx, &ipeventtx).await; }); - let mut ip_init = IpData { - ip: "".to_string(), - src: "".to_string(), - date: "".to_string(), - hostname: "".to_string(), - mode: "init".to_string(), + let ipevent_init = IpEvent { + msgtype: String::from("init"), + ipdata: IpData { + ip: "".to_string(), + src: "".to_string(), + date: "".to_string(), + hostname: "".to_string(), + mode: "".to_string(), + }, }; - send_to_ipbl_zmq(&zmqreqsocket, &mut ip_init).await; + send_to_ipbl_zmq(&zmqreqsocket, &ipevent_init).await; loop { let mut ret: Vec = Vec::new(); // wait for logs parse and zmq channel receive //let mut received_ip = ipdatarx.recv(); - let ipdata_wait = ipdatarx.recv(); + let ipdata_wait = ipeventrx.recv(); let apimsg_wait = apirx.recv(); let ctxclone = Arc::clone(&ctxarc); @@ -74,22 +77,29 @@ pub async fn run() { let mut ctx = ctxclone.write().await; - if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() { - for ip_to_send in &mut ctx.get_blocklist_toblock().await { - ip_to_send.mode = "init".to_string(); - send_to_ipbl_zmq(&zmqreqsocket, ip_to_send).await; + if received_ip.ipdata.ip == "".to_string() && received_ip.msgtype == "init".to_string() { + for ip_to_send in ctx.get_blocklist_toblock().await { + let event = IpEvent{ + msgtype: String::from("init"), + ipdata: ip_to_send, + }; + send_to_ipbl_zmq(&zmqreqsocket, &event).await; } continue; } // refresh context blocklist - let filtered_ip = ctx.update_blocklist(&received_ip).await; + let filtered_ip = ctx.update_blocklist(&received_ip.ipdata).await; // send ip list to ws and zmq sockets if let Some(ip) = filtered_ip { println!("sending {} to ws and zmq", ip.ip); send_to_ipbl_ws(&ctx, &ip, &mut ret).await; - send_to_ipbl_zmq(&zmqreqsocket, &ip).await; + let event = IpEvent{ + msgtype: String::from("add"), + ipdata: ip, + }; + send_to_ipbl_zmq(&zmqreqsocket, &event).await; } } _val = apimsg_wait => { @@ -171,7 +181,7 @@ async fn get_last_file_size(w: &mut HashMap, path: &str) -> (u64, b async fn compare_files_changes( ctxarc: &Arc>, inrx: &mut Receiver, - ipdatatx: &Sender, + ipeventtx: &Sender, ) { let mut tnets; loop { @@ -231,7 +241,11 @@ async fn compare_files_changes( } } for ip in iplist { - ipdatatx.send(ip).await.unwrap(); + let ipevent = IpEvent { + msgtype: String::from("file"), + ipdata: ip, + }; + ipeventtx.send(ipevent).await.unwrap(); } } None => {} diff --git a/src/zmqcom.rs b/src/zmqcom.rs index de18769..e87f0dd 100644 --- a/src/zmqcom.rs +++ b/src/zmqcom.rs @@ -1,5 +1,5 @@ use crate::config::{Context, ZMQ}; -use crate::ip::IpData; +use crate::ip::IpEvent; use crate::utils::gethostname; use std::sync::Arc; @@ -21,7 +21,7 @@ pub async fn zconnect<'a>( Ok(socket) } -pub async fn zmqinit(ctx: &Arc>, ipdatatx: &Sender) -> zmq::Socket { +pub async fn zmqinit(ctx: &Arc>, ipeventtx: &Sender) -> zmq::Socket { let ctxarc = Arc::clone(&ctx); let zmqreqsocket; @@ -36,11 +36,11 @@ pub async fn zmqinit(ctx: &Arc>, ipdatatx: &Sender) -> z .unwrap(); } - listenpubsub(&ctx, ipdatatx.clone(), zmqsubsocket).await; + listenpubsub(&ctx, ipeventtx.clone(), zmqsubsocket).await; return zmqreqsocket; } -async fn listenpubsub(ctx: &Arc>, txpubsub: Sender, socket: zmq::Socket) { +async fn listenpubsub(ctx: &Arc>, txpubsub: Sender, socket: zmq::Socket) { let prefix; { let ctx = ctx.read().await; @@ -71,8 +71,10 @@ async fn listenpubsub(ctx: &Arc>, txpubsub: Sender, sock match msgs { Some(ss) => { let msg = ss.strip_prefix(prefix.as_str()).unwrap(); - let tosend: IpData = serde_json::from_str(msg).unwrap(); - if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() { + let tosend: IpEvent = serde_json::from_str(msg).unwrap(); + if tosend.ipdata.hostname != gethostname(true) + || tosend.msgtype == "init".to_string() + { txpubsub.send(tosend).await.unwrap(); } } @@ -82,7 +84,7 @@ async fn listenpubsub(ctx: &Arc>, txpubsub: Sender, sock }); } -pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpData) { +pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpEvent) { let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); match reqsocket.send(&msg, 0) { Ok(_) => {}