use crate::api::apiserver; use crate::config::{Context, GIT_VERSION}; use crate::fw::{fwblock, fwinit}; use crate::ip::{filter, IpData, IpEvent}; use crate::utils::{gethostname, read_lines, sleep_ms}; use crate::webservice::send_to_ipbl_api; use crate::websocket::{send_to_ipbl_websocket, websocketinit}; use chrono::prelude::*; use chrono::prelude::{DateTime, Local}; use chrono::Duration; use nix::sys::inotify::InotifyEvent; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; pub const PKG_NAME: &str = env!("CARGO_PKG_NAME"); const BL_CHAN_SIZE: usize = 32; const ZMQ_CHAN_SIZE: usize = 64; const API_CHAN_SIZE: usize = 64; pub async fn run() { let fqdn = gethostname(true); let globalctx = Context::new().await; let ctxarc = Arc::new(RwLock::new(globalctx)); let mut ret: Vec = Vec::new(); let mut fwlen: usize = 0; let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); let mut last_cfg_reload: DateTime = Local::now().trunc_subsecs(0); println!("Launching {}, version {}", PKG_NAME, pkgversion); fwinit(); let (_apitx, mut apirx): (Sender, Receiver) = channel(API_CHAN_SIZE); let ctxclone = Arc::clone(&ctxarc); apiserver(&ctxclone).await.unwrap(); // initialize sockets let (ipeventtx, mut ipeventrx): (Sender, Receiver) = channel(ZMQ_CHAN_SIZE); let (wspubsubsocket, mut wsreqsocket) = websocketinit(&ctxclone, &ipeventtx).await; let mut blrx = watchfiles(&ctxclone).await; let ctxclone = Arc::clone(&ctxarc); tokio::spawn(async move { compare_files_changes(&ctxclone, &mut blrx, &ipeventtx).await; }); let ctxclone = Arc::clone(&ctxarc); let bootstrap_event = ctxclone.read().await.cfg.bootstrap_event().clone(); send_to_ipbl_websocket(&mut wsreqsocket, &bootstrap_event, &mut ret).await; loop { ret = Vec::new(); // wait for logs parse and zmq channel receive //let mut received_ip = ipdatarx.recv(); let ipdata_wait = ipeventrx.recv(); let apimsg_wait = apirx.recv(); let force_wait = sleep_ms(200); let ctxclone = Arc::clone(&ctxarc); tokio::select! { val = ipdata_wait => { let received_ip = val.unwrap(); let mut ctx = ctxclone.write().await; if received_ip.msgtype == "bootstrap".to_string() { for ip_to_send in ctx.get_blocklist_toblock().await { let ipe = IpEvent{ msgtype: String::from("init"), mode: String::from("zmq"), hostname: fqdn.clone(), ipdata: ip_to_send, }; send_to_ipbl_websocket(&mut wsreqsocket, &ipe, &mut ret).await; } continue } // refresh context blocklist let filtered_ipevent = ctx.update_blocklist(&received_ip).await; // send ip list to api and ws sockets if let Some(ipevent) = filtered_ipevent { if received_ip.msgtype != "init" { println!("sending {} to api and ws", ipevent.ipdata.ip); let event = IpEvent{ msgtype: String::from("add"), mode: String::from("socket"), hostname: fqdn.clone(), ipdata: ipevent.ipdata, }; send_to_ipbl_api(&ctx, &event, &mut ret).await; send_to_ipbl_websocket(&mut wsreqsocket, &event, &mut ret).await; } } } _val = apimsg_wait => {} _val = force_wait => {} }; let toblock; { let mut ctx = ctxarc.write().await; ctx.gc_blocklist().await; toblock = ctx.get_blocklist_toblock().await; } // apply firewall blocking match fwblock(&toblock, &mut ret, &mut fwlen) { Ok(_) => {} Err(err) => { println!("Err: {err}, unable to push firewall rules, use super user") } }; // log lines if ret.len() > 0 { println!("{ret}", ret = ret.join(", ")); } { let now_cfg_reload = Local::now().trunc_subsecs(0); if (now_cfg_reload - last_cfg_reload) > Duration::seconds(5) { // reload configuration from the server let mut ctx = ctxclone.write().await; match ctx.load().await { Ok(_) => { last_cfg_reload = Local::now().trunc_subsecs(0); } Err(err) => { println!("error loading config: {err}"); } } }; } } } async fn watchfiles(ctxarc: &Arc>) -> Receiver { let (bltx, blrx): (Sender, Receiver) = channel(BL_CHAN_SIZE); let ctxclone = Arc::clone(ctxarc); tokio::spawn(async move { loop { let events; let instance; { let ctx = ctxclone.read().await; instance = ctx.instance.clone(); } events = instance.read_events().unwrap(); for inevent in events { let date: DateTime = Local::now().trunc_subsecs(0); bltx.send(FileEvent { inevent, date }).await.unwrap(); } } }); blrx } async fn get_last_file_size(w: &mut HashMap, path: &str) -> (u64, bool) { let currentlen = match std::fs::metadata(&path.to_string()) { Ok(u) => u.len().clone(), Err(_) => 0u64, }; let lastlen = match w.insert(path.to_string(), currentlen) { Some(u) => u, None => 0u64, }; (lastlen, lastlen != currentlen) } async fn compare_files_changes( ctxarc: &Arc>, inrx: &mut Receiver, ipeventtx: &Sender, ) { let mut tnets; loop { let modfiles = inrx.recv().await.unwrap(); let mut iplist: Vec = vec![]; let sask; let sas; { let ctx = ctxarc.read().await; sas = ctx.clone().sas; sask = sas.keys(); tnets = ctx.cfg.build_trustnets(); } match modfiles.inevent.name { Some(name) => { let filename = name.to_str().unwrap(); for sak in sask { let sa = sas.get(sak).unwrap(); if modfiles.inevent.wd == sa.wd { let handle: String; if sa.filename.as_str() == "" { handle = format!("{}/{}", &sa.fullpath, filename); } else if filename.starts_with(sa.filename.as_str()) { handle = sa.fullpath.to_owned(); } else { continue; } let (filesize, sizechanged); { let mut ctx = ctxarc.write().await; let sa = ctx.sas.get_mut(sak).unwrap(); (filesize, sizechanged) = get_last_file_size(&mut sa.watchedfiles, &handle).await; } if !sizechanged { continue; } match read_lines(&handle, filesize) { Some(lines) => { filter( lines, &mut iplist, &tnets, &sa.regex, &sa.set.src, &modfiles.date, ); } None => {} }; break; } } for ip in iplist { let ipevent = IpEvent { msgtype: String::from("add"), hostname: gethostname(true), mode: String::from("file"), ipdata: ip, }; ipeventtx.send(ipevent).await.unwrap(); } } None => {} } } } pub struct FileEvent { pub inevent: InotifyEvent, pub date: DateTime, } impl std::fmt::Debug for FileEvent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{ie:?}", ie = self.inevent) } }