use crate::config::{Context, GIT_VERSION}; use crate::fw::{fwblock, fwinit}; use crate::ip::{filter, IpData, IpEvent}; use crate::monitoring::apiserver; use crate::utils::{gethostname, read_lines, sleep_ms}; use crate::webservice::send_to_ipbl_api; use crate::websocket::{send_to_ipbl_websocket, websocketpubsub, websocketreqrep}; use chrono::prelude::*; use chrono::prelude::{DateTime, Local}; use chrono::Duration; use nix::sys::inotify::InotifyEvent; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; pub const PKG_NAME: &str = env!("CARGO_PKG_NAME"); const BL_CHAN_SIZE: usize = 32; const WS_CHAN_SIZE: usize = 64; pub async fn run() { let globalctx = Context::new().await; let ctxarc = Arc::new(RwLock::new(globalctx)); let mut fwlen: usize = 0; let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); let mut last_cfg_reload: DateTime = Local::now().trunc_subsecs(0); println!("Launching {}, version {}", PKG_NAME, pkgversion); fwinit(); let ctxapi = Arc::clone(&ctxarc); apiserver(&ctxapi).await.unwrap(); // initialize sockets let (ipeventtx, mut ipeventrx): (Sender, Receiver) = channel(WS_CHAN_SIZE); let ipeventtxarc = Arc::new(RwLock::new(ipeventtx)); // init pubsub let ctxwsps = Arc::clone(&ctxarc); let ipeventws = Arc::clone(&ipeventtxarc); websocketpubsub(&ctxwsps, ipeventws).await; let ctxwsrr = Arc::clone(&ctxarc); let mut wssocketrr = websocketreqrep(&ctxwsrr).await; // init file watcher let mut blrx = watchfiles(&ctxarc).await; let ctxclone = Arc::clone(&ctxarc); let ipeventclone = Arc::clone(&ipeventtxarc); tokio::spawn(async move { compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await; }); loop { let mut ret: Vec = Vec::new(); let ctxclone = Arc::clone(&ctxarc); tokio::select! { ipevent = ipeventrx.recv() => { let received_ip = ipevent.unwrap(); let mut ctx = ctxclone.write().await; if received_ip.msgtype == "bootstrap".to_string() { for ip_to_send in ctx.get_blocklist_toblock().await { let ipe = IpEvent{ msgtype: String::from("init"), mode: String::from("ws"), hostname: gethostname(true), ipdata: ip_to_send, }; if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { drop(ctx); wssocketrr = websocketreqrep(&ctxwsrr).await; break; } } continue } // refresh context blocklist let filtered_ipevent = ctx.update_blocklist(&received_ip).await; // send ip list to api and ws sockets if let Some(ipevent) = filtered_ipevent { if received_ip.msgtype != "init" { println!("sending {} to api and ws", ipevent.ipdata.ip); let ipe = IpEvent{ msgtype: String::from("add"), mode: String::from("ws"), hostname: gethostname(true), ipdata: ipevent.ipdata, }; send_to_ipbl_api(&ctx.client, &ctx.flags.server, &ipe).await; let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await; if !status { drop(ctx); wssocketrr = websocketreqrep(&ctxwsrr).await; continue; } } } } _val = sleep_ms(200) => {} }; let ctxclone = Arc::clone(&ctxarc); handle_fwblock(ctxclone, &mut ret, &mut fwlen).await; // log lines if ret.len() > 0 { println!("{ret}", ret = ret.join(", ")); } let ctxclone = Arc::clone(&ctxarc); handle_cfg_reload(ctxclone, &mut last_cfg_reload).await; } } async fn handle_cfg_reload(ctxclone: Arc>, last_cfg_reload: &mut DateTime) { let now_cfg_reload = Local::now().trunc_subsecs(0); if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(5) { // reload configuration from the server let mut ctx = ctxclone.write().await; match ctx.load().await { Ok(_) => { *last_cfg_reload = Local::now().trunc_subsecs(0); } Err(err) => { println!("error loading config: {err}"); } } }; } async fn handle_fwblock(ctxclone: Arc>, ret: &mut Vec, fwlen: &mut usize) { let toblock; { let mut ctx = ctxclone.write().await; ctx.gc_blocklist().await; toblock = ctx.get_blocklist_toblock().await; // apply firewall blocking match fwblock(&toblock, ret, fwlen) { Ok(_) => {} Err(err) => { println!("Err: {err}, unable to push firewall rules, use super user") } }; } } async fn watchfiles(ctxarc: &Arc>) -> Receiver { let (bltx, blrx): (Sender, Receiver) = channel(BL_CHAN_SIZE); let ctxclone = Arc::clone(ctxarc); tokio::spawn(async move { loop { let events; let instance; { let ctx = ctxclone.read().await; instance = ctx.instance.clone(); } events = instance.read_events().unwrap(); for inevent in events { let date: DateTime = Local::now().trunc_subsecs(0); bltx.send(FileEvent { inevent, date }).await.unwrap(); } } }); blrx } async fn get_last_file_size(w: &mut HashMap, path: &str) -> (u64, bool) { let currentlen = match std::fs::metadata(&path.to_string()) { Ok(u) => u.len().clone(), Err(_) => 0u64, }; let lastlen = match w.insert(path.to_string(), currentlen) { Some(u) => u, None => 0u64, }; (lastlen, lastlen != currentlen) } async fn compare_files_changes( ctxarc: &Arc>, inrx: &mut Receiver, ipeventtx: &Arc>>, ) { let mut tnets; loop { let modfiles = inrx.recv().await.unwrap(); let mut iplist: Vec = vec![]; let sask; let sas; { let ctx = ctxarc.read().await; sas = ctx.clone().sas; sask = sas.keys(); tnets = ctx.cfg.build_trustnets(); } match modfiles.inevent.name { Some(name) => { let filename = name.to_str().unwrap(); for sak in sask { let sa = sas.get(sak).unwrap(); if modfiles.inevent.wd == sa.wd { let handle: String; if sa.filename.as_str() == "" { handle = format!("{}/{}", &sa.fullpath, filename); } else if filename.starts_with(sa.filename.as_str()) { handle = sa.fullpath.to_owned(); } else { continue; } let (filesize, sizechanged); { let mut ctx = ctxarc.write().await; let sa = ctx.sas.get_mut(sak).unwrap(); (filesize, sizechanged) = get_last_file_size(&mut sa.watchedfiles, &handle).await; } if !sizechanged { continue; } match read_lines(&handle, filesize) { Some(lines) => { filter( lines, &mut iplist, &tnets, &sa.regex, &sa.set.src, &modfiles.date, ); } None => {} }; break; } } for ip in iplist { let ipevent = IpEvent { msgtype: String::from("add"), hostname: gethostname(true), mode: String::from("file"), ipdata: ip, }; let ipetx = ipeventtx.write().await; ipetx.send(ipevent).await.unwrap(); } } None => {} } } } pub struct FileEvent { pub inevent: InotifyEvent, pub date: DateTime, } impl std::fmt::Debug for FileEvent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{ie:?}", ie = self.inevent) } }