use crate::config::{Context, GIT_VERSION}; use crate::fw::{fwblock, fwglobalinit}; use crate::ip::{filter, IpData, IpEvent}; use crate::ipevent; use crate::monitoring::apiserver; use crate::utils::{gethostname, read_lines, sleep_s}; use crate::webservice::send_to_ipbl_api; use crate::websocket::{send_to_ipbl_websocket, websocketpubsub, websocketreqrep}; use chrono::prelude::*; use chrono::prelude::{DateTime, Local}; use chrono::Duration; use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; pub const PKG_NAME: &str = env!("CARGO_PKG_NAME"); const BL_CHAN_SIZE: usize = 32; const WS_CHAN_SIZE: usize = 64; const LOOP_MAX_WAIT: u64 = 5; pub async fn run() { let inotify = Inotify::init(InitFlags::empty()).unwrap(); let globalctx = Context::new(&inotify).await; let ctxarc = Arc::new(RwLock::new(globalctx)); let mut fwlen: usize = 0; let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); let mut last_cfg_reload: DateTime = Local::now().trunc_subsecs(0); println!("Launching {}, version {}", PKG_NAME, pkgversion); fwglobalinit(); let ctxapi = Arc::clone(&ctxarc); apiserver(&ctxapi).await.unwrap(); // initialize sockets let (ipeventtx, mut ipeventrx): (Sender, Receiver) = channel(WS_CHAN_SIZE); let ipeventtxarc = Arc::new(RwLock::new(ipeventtx)); // init pubsub let ctxwsps = Arc::clone(&ctxarc); let ipeventws = Arc::clone(&ipeventtxarc); websocketpubsub(&ctxwsps, ipeventws).await; let ctxwsrr = Arc::clone(&ctxarc); let mut wssocketrr = websocketreqrep(&ctxwsrr).await; // init file watcher let inoarc = Arc::new(RwLock::new(inotify)); let inoclone = Arc::clone(&inoarc); let mut blrx = watchfiles(inoclone).await; let ctxclone = Arc::clone(&ctxarc); let ipeventclone = Arc::clone(&ipeventtxarc); tokio::spawn(async move { compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await; }); loop { let mut ret: Vec = Vec::new(); let ctxclone = Arc::clone(&ctxarc); tokio::select! { ipevent = ipeventrx.recv() => { let received_ip = ipevent.unwrap(); let (toblock,server) = { let ctx = ctxclone.read().await; (ctx.get_blocklist_toblock().await,ctx.flags.server.clone()) }; if received_ip.msgtype == "bootstrap".to_string() { for ip_to_send in toblock { let ipe = ipevent!("init","ws",gethostname(true),ip_to_send); if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { wssocketrr = websocketreqrep(&ctxwsrr).await; break; } } continue } // refresh context blocklist let filtered_ipevent = { ctxarc.write().await.update_blocklist(&received_ip).await }; // send ip list to api and ws sockets if let Some(ipevent) = filtered_ipevent { if received_ip.msgtype != "init" { println!("sending {} to api and ws", ipevent.ipdata.ip); let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata); send_to_ipbl_api(&server.clone(), &ipe).await; let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await; if !status { wssocketrr = websocketreqrep(&ctxwsrr).await; continue; } } } } _val = sleep_s(LOOP_MAX_WAIT) => {} }; let ctxclone = Arc::clone(&ctxarc); handle_fwblock(ctxclone, &mut ret, &mut fwlen).await; // log lines if ret.len() > 0 { println!("{ret}", ret = ret.join(", ")); } let ctxclone = Arc::clone(&ctxarc); let inoclone = Arc::clone(&inoarc); handle_cfg_reload(&ctxclone, &mut last_cfg_reload, inoclone).await; } } async fn handle_cfg_reload( ctxclone: &Arc>, last_cfg_reload: &mut DateTime, inoarc: Arc>, ) { let now_cfg_reload = Local::now().trunc_subsecs(0); if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) { let inotify = inoarc.read().await; match ctxclone.write().await.load(&inotify).await { Ok(_) => { *last_cfg_reload = Local::now().trunc_subsecs(0); } Err(_) => { println!("error reloading config"); } } }; } async fn handle_fwblock(ctxclone: Arc>, ret: &mut Vec, fwlen: &mut usize) { { let mut ctx = ctxclone.write().await; ctx.gc_blocklist().await; } let toblock = { let ctx = ctxclone.read().await; ctx.get_blocklist_toblock().await }; // apply firewall blocking match fwblock(&toblock, ret, fwlen) { Ok(_) => {} Err(err) => { println!("Err: {err}, unable to push firewall rules, use super user") } }; } async fn watchfiles(inoarc: Arc>) -> Receiver { let (bltx, blrx): (Sender, Receiver) = channel(BL_CHAN_SIZE); tokio::spawn(async move { loop { let events = inoarc.read().await.read_events().unwrap(); for inevent in events { let date: DateTime = Local::now().trunc_subsecs(0); bltx.send(FileEvent { inevent, date }).await.unwrap(); } } }); blrx } async fn get_last_file_size(w: &mut HashMap, path: &str) -> (u64, bool) { let currentlen = match std::fs::metadata(&path.to_string()) { Ok(u) => u.len(), Err(_) => 0u64, }; let lastlen = match w.insert(path.to_string(), currentlen) { Some(u) => u, None => currentlen, }; (lastlen, lastlen != currentlen) } async fn compare_files_changes( ctxarc: &Arc>, inrx: &mut Receiver, ipeventtx: &Arc>>, ) { let mut tnets; loop { let modfiles = inrx.recv().await.unwrap(); let mut iplist: Vec = vec![]; let sas = { let ctx = ctxarc.read().await; tnets = ctx.cfg.build_trustnets(); ctx.sas.clone() }; match modfiles.inevent.name { Some(name) => { let filename = name.to_str().unwrap(); for (sak, sa) in sas.clone().iter_mut() { if modfiles.inevent.wd == sa.wd { let handle: String; if sa.filename.as_str() == "" { handle = format!("{}/{}", &sa.fullpath, filename); } else if filename.starts_with(sa.filename.as_str()) { handle = sa.fullpath.to_owned(); } else { continue; } let (filesize, sizechanged) = { let mut ctx = ctxarc.write().await; let sa = ctx.sas.get_mut(sak).unwrap(); get_last_file_size(&mut sa.watchedfiles, &handle).await }; if !sizechanged { continue; } match read_lines(&handle, filesize) { Some(lines) => { filter( lines, &mut iplist, &tnets, &sa.regex, &sa.set.src, &modfiles.date, ); } None => {} }; break; } } for ip in iplist { let ipe = ipevent!("add", "file", gethostname(true), ip); let ipetx = ipeventtx.read().await; ipetx.send(ipe).await.unwrap(); } } None => {} } } } pub struct FileEvent { pub inevent: InotifyEvent, pub date: DateTime, } impl std::fmt::Debug for FileEvent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{ie:?}", ie = self.inevent) } }