use crate::config::{Context, GIT_VERSION}; use crate::fw::{block, init}; use crate::ip::{filter, push_ip, IpData}; use crate::utils::{gethostname, read_lines, sleep_s}; use crate::zmqcom::zconnect; use chrono::prelude::*; use chrono::prelude::{DateTime, Local}; use chrono::Duration; use nix::sys::inotify::InotifyEvent; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; const BL_CHAN_SIZE: usize = 32; const ZMQ_CHAN_SIZE: usize = 64; pub async fn run() { let ctx = Arc::new(Mutex::new(Context::new().await)); println!( "Launching {}, version {}", env!("CARGO_PKG_NAME"), format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION) ); let (ipdatatx, mut ipdatarx): (Sender, Receiver) = channel(ZMQ_CHAN_SIZE); // initialize the firewall table init(&env!("CARGO_PKG_NAME").to_string()); let mut fwlen: usize = 0; // initialize zeromq sockets let reqsocket; let subsocket; { let ctxarc = Arc::clone(&ctx); let zmqctx = ctxarc.lock().await; reqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ) .await .unwrap(); subsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB) .await .unwrap(); } listenpubsub(&ctx, ipdatatx.clone(), subsocket).await; let mut blrx = watchfiles(&ctx).await; let ctxarc = Arc::clone(&ctx); tokio::spawn(async move { compare_files_changes(&ctxarc, &mut blrx, &ipdatatx).await; }); let mut ip_init = IpData { ip: "".to_string(), src: "".to_string(), date: "".to_string(), hostname: "".to_string(), mode: "init".to_string(), }; send_to_ipbl_zmq(&reqsocket, &mut ip_init).await; loop { let mut ret: Vec = Vec::new(); let begin: DateTime = Local::now().trunc_subsecs(0); // wait for logs parse and zmq channel receive let mut received_ip = ipdatarx.recv().await.unwrap(); // lock the context mutex let ctxarc = Arc::clone(&ctx); let mut ctx = ctxarc.lock().await; if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() { for ip_to_send in &mut ctx.get_blocklist_toblock().await { ip_to_send.mode = "init".to_string(); send_to_ipbl_zmq(&reqsocket, ip_to_send).await; } continue; } // refresh context blocklist let filtered_ip = ctx.update_blocklist(&mut received_ip).await; ctx.gc_blocklist().await; // send ip list to ws and zmq sockets if let Some(mut ip) = filtered_ip { send_to_ipbl_ws(&ctx, &mut ip, &mut ret).await; send_to_ipbl_zmq(&reqsocket, &mut ip).await; } // apply firewall blocking block( &env!("CARGO_PKG_NAME").to_string(), &ctx.get_blocklist_toblock().await, &mut ret, &mut fwlen, ) .unwrap(); // log lines if ret.len() > 0 { println!("{ret}", ret = ret.join(", ")); } let end: DateTime = Local::now().trunc_subsecs(0); if (end - begin) > Duration::seconds(5) { // reload configuration from the server match ctx.load().await { Ok(_) => {} Err(err) => { println!("error loading config: {err}"); } } } } } async fn watchfiles(ctx: &Arc>) -> Receiver { let (bltx, blrx): (Sender, Receiver) = channel(BL_CHAN_SIZE); let ctx = Arc::clone(ctx); tokio::spawn(async move { loop { let events: Vec; { let c = ctx.lock().await; let instance = c.instance.clone(); drop(c); events = instance.read_events().unwrap(); } for inevent in events { let date: DateTime = Local::now().trunc_subsecs(0); bltx.send(FileEvent { inevent, date }).await.unwrap(); } } }); blrx } async fn get_last_file_size(w: &mut HashMap, path: &str) -> (u64, bool) { let currentlen = match std::fs::metadata(&path.to_string()) { Ok(u) => u.len().clone(), Err(_) => 0u64, }; let lastlen = match w.insert(path.to_string(), currentlen) { Some(u) => u, None => 0u64, }; (lastlen, lastlen != currentlen) } async fn compare_files_changes( ctx: &Arc>, inrx: &mut Receiver, ipdatatx: &Sender, ) { let mut tnets; loop { let modfiles = inrx.recv().await.unwrap(); let mut iplist: Vec = vec![]; let mut ctx = ctx.lock().await; tnets = ctx.cfg.build_trustnets(); match modfiles.inevent.name { Some(name) => { let filename = name.to_str().unwrap(); for sak in &mut ctx.clone().sas.keys() { let sa = &mut ctx.sas.get_mut(sak).unwrap(); if modfiles.inevent.wd == sa.wd { let handle: String; if sa.filename.as_str() == "" { handle = format!("{}/{}", &sa.fullpath, filename); } else if filename.starts_with(sa.filename.as_str()) { handle = sa.fullpath.to_owned(); } else { continue; } let (filesize, sizechanged) = get_last_file_size(&mut sa.watchedfiles, &handle).await; if !sizechanged { continue; } match read_lines(&handle, filesize) { Some(lines) => { filter( lines, &mut iplist, &tnets, &sa.regex, &sa.set.src, &modfiles.date, ); } None => {} }; break; } } for ip in iplist { ipdatatx.send(ip).await.unwrap(); } } None => {} } } } pub struct FileEvent { pub inevent: InotifyEvent, pub date: DateTime, } impl std::fmt::Debug for FileEvent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{ie:?}", ie = self.inevent) } } async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &mut IpData) { let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); match reqsocket.send(&msg, 0) { Ok(_) => {} Err(e) => { println!("{e:?}") } }; match reqsocket.recv_string(0) { Ok(o) => match o { Ok(_) => {} Err(ee) => { println!("{ee:?}") } }, Err(e) => { println!("{e:?}") } }; } async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec) { ret.push(format!("host: {hostname}", hostname = ctx.hostname)); loop { match push_ip(&ctx, &ip, ret).await { Ok(_) => { break; } Err(err) => { println!("{err}"); sleep_s(1); } }; } } async fn listenpubsub(ctx: &Arc>, txpubsub: Sender, socket: zmq::Socket) { let ctx = ctx.lock().await; let prefix = format!( "{sub} ", sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription ); socket .set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes()) .expect("failed setting subscription"); drop(ctx); tokio::spawn(async move { loop { let msgs: Option = match socket.recv_string(0) { Ok(s) => match s { Ok(ss) => Some(ss), Err(e) => { println!("{e:?}"); None } }, Err(e) => { println!("{e:?}"); None } }; match msgs { Some(ss) => { let msg = ss.strip_prefix(prefix.as_str()).unwrap(); let tosend: IpData = serde_json::from_str(msg).unwrap(); if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() { txpubsub.send(tosend).await.unwrap(); } } None => {} }; } }); }