use crate::ip::{BlockIpData, IpData, IpEvent}; use crate::utils::{gethostname, sleep_s}; use chrono::prelude::*; use chrono::Duration; use clap::{Arg, ArgAction, ArgMatches, Command}; use git_version::git_version; use ipnet::IpNet; use nix::sys::inotify::{AddWatchFlags, InitFlags, Inotify, WatchDescriptor}; use regex::Regex; use reqwest::{Client, Error as ReqError, Response}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::path::Path; pub const GIT_VERSION: &str = git_version!(); const MASTERSERVER: &str = "ipbl.paulbsd.com"; const WSSUBSCRIPTION: &str = "ipbl"; const CONFIG_RETRY: u64 = 1; const WEB_CLIENT_TIMEOUT: i64 = 5; #[derive(Debug, Clone)] pub struct Context { pub blocklist: HashMap, pub cfg: Config, pub discovery: Discovery, pub flags: Flags, pub hostname: String, pub instance: Box, pub sas: HashMap, pub hashwd: HashMap, } #[derive(Debug, Clone)] pub struct SetMap { pub filename: String, pub fullpath: String, pub regex: Regex, pub set: Set, pub watchedfiles: HashMap, pub wd: WatchDescriptor, } #[derive(Debug, Clone)] pub struct Flags { pub debug: bool, pub server: String, } impl Context { pub async fn new() -> Self { // Get flags let argp: ArgMatches = Context::argparse(); //let debug: bool = argp.contains_id("debug"); let debug: bool = argp.get_one::("debug").unwrap().to_owned(); let server: String = argp.get_one::("server").unwrap().to_string(); // Build context let mut ctx = Context { cfg: Config::new(), flags: Flags { debug, server }, hostname: gethostname(true), discovery: Discovery { version: "1.0".to_string(), urls: HashMap::new(), }, sas: HashMap::new(), instance: Box::new(Inotify::init(InitFlags::empty()).unwrap()), blocklist: HashMap::new(), hashwd: HashMap::new(), }; print!("Loading config ... "); ctx.load().await.unwrap(); ctx } pub fn argparse() -> ArgMatches { Command::new(env!("CARGO_PKG_NAME")) .version(format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION)) .author(env!("CARGO_PKG_AUTHORS")) .about(env!("CARGO_PKG_DESCRIPTION")) .arg( Arg::new("server") .short('s') .long("server") .value_name("server") .default_value(format!("https://{MASTERSERVER}")) .help("Sets a http server"), ) .arg( Arg::new("debug") .short('d') .help("Enable debugging") .action(ArgAction::SetTrue), ) .get_matches() } #[allow(dead_code)] pub async fn discovery(&self) -> Result { let resp: Result = httpclient() .get(format!("{server}/discovery", server = self.flags.server)) .send() .await; let req = match resp { Ok(re) => re, Err(err) => return Err(err), }; let data: Discovery = match req.json().await { Ok(res) => res, Err(err) => return Err(err), }; Ok(data) } pub async fn load(&mut self) -> Result<(), Box> { if cfg!(test) { return Ok(()); } let mut last_in_err = false; loop { let res = self.cfg.load(&self.flags.server).await; match res { Ok(()) => { if last_in_err { println!("loaded config"); } break; } Err(err) => { println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs"); last_in_err = true; sleep_s(CONFIG_RETRY).await; } }; } if last_in_err { println!("creating sas"); } self.create_sas().await?; if last_in_err { println!("created sas"); } Ok(()) } #[cfg(test)] pub async fn get_blocklist_pending(&self) -> Vec { let mut res: Vec = vec![]; for (_, v) in self.blocklist.iter() { res.push(v.ipdata.clone()); } res } pub async fn get_blocklist_toblock(&mut self) -> Vec { let mut res: Vec = vec![]; for (_, block) in self.blocklist.iter_mut() { match self.cfg.sets.get(&block.ipdata.src) { Some(set) => { if block.tryfail >= set.tryfail { res.push(block.ipdata.clone()); } } None => {} } } res } pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option { match self.cfg.sets.get(&ipevent.ipdata.src) { Some(set) => { let starttime = DateTime::parse_from_rfc3339(ipevent.ipdata.date.as_str()) .unwrap() .with_timezone(&chrono::Local); let blocktime = set.blocktime; if ipevent.mode == "file".to_string() && self.hostname == ipevent.hostname { let block = self .blocklist .entry(ipevent.ipdata.ip.to_string()) .or_insert(BlockIpData { ipdata: ipevent.ipdata.clone(), tryfail: 0, starttime, blocktime, }); block.tryfail += 1; block.blocktime = blocktime; if block.tryfail >= set.tryfail { return Some(ipevent.clone()); } } else { self.blocklist .entry(ipevent.ipdata.ip.to_string()) .or_insert(BlockIpData { ipdata: ipevent.ipdata.clone(), tryfail: set.tryfail, starttime, blocktime, }); } } None => {} } None } pub async fn gc_blocklist(&mut self) -> Vec { let mut removed: Vec = vec![]; let now: DateTime = Local::now().trunc_subsecs(0); // nightly, future use // let drained: HashMap = ctx.blocklist.drain_filter(|k,v| v.parse_date() < mindate) for (ip, blocked) in self.blocklist.clone().iter() { /*match self.cfg.sets.get(&blocked.ipdata.src) { Some(set) => { let mut block = self.blocklist.get_mut(ip).unwrap(); block.blocktime = set.blocktime.clone(); } None => {} }*/ let mindate = now - Duration::minutes(blocked.blocktime); if blocked.starttime < mindate { self.blocklist.remove(&ip.clone()).unwrap(); removed.push(blocked.ipdata.clone()); } } removed } pub async fn create_sas(&mut self) -> Result<(), Box> { for (src, set) in self.cfg.sets.iter() { let p = Path::new(set.path.as_str()); if p.is_dir() { let wd = match self.hashwd.get(&set.path.to_string()) { Some(wd) => *wd, None => { let res = self .instance .add_watch(set.path.as_str(), AddWatchFlags::IN_MODIFY) .unwrap(); self.hashwd.insert(set.path.to_string(), res); res } }; let fullpath: String = match set.filename.as_str() { "" => set.path.clone(), _ => { format!( "{path}/{filename}", path = set.path, filename = set.filename.clone() ) } }; match self.sas.get_mut(&src.clone()) { Some(s) => { s.filename = set.filename.clone(); s.fullpath = fullpath; s.set = set.clone(); s.regex = Regex::new(set.regex.as_str()).unwrap(); } None => { self.sas.insert( src.clone(), SetMap { filename: set.filename.clone(), fullpath, set: set.clone(), regex: Regex::new(set.regex.as_str()).unwrap(), wd, watchedfiles: HashMap::new(), }, ); } } } } Ok(()) } } #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Config { pub sets: HashMap, #[serde(skip_serializing)] pub trustnets: Vec, pub ws: HashMap, pub api: String, } impl Config { pub fn new() -> Self { Self { sets: HashMap::from([ ("smtp".to_string(), Set { src: "smtp".to_string(), filename: "mail.log".to_string(), regex: "(SASL LOGIN authentication failed)".to_string(), path: "/var/log".to_string(), blocktime: 60, tryfail: 5, }), ("ssh".to_string(), Set { src: "ssh".to_string(), filename: "auth.log".to_string(), regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(), path: "/var/log".to_string(), blocktime: 60, tryfail: 5, },), ("http".to_string(), Set { src: "http".to_string(), filename: "".to_string(), regex: "(anonymousfox.co)".to_string(), path: "/var/log/nginx".to_string(), blocktime: 60, tryfail: 5, },), ("openvpn".to_string(), Set { src: "openvpn".to_string(), filename: "status".to_string(), regex: "(UNDEF)".to_string(), path: "/var/run/openvpn".to_string(), blocktime: 60, tryfail: 5, },), ]), trustnets: vec![ "127.0.0.0/8".to_string(), "10.0.0.0/8".to_string(), "172.16.0.0/12".to_string(), "192.168.0.0/16".to_string(), ], ws: HashMap::from([("pubsub".to_string(),WebSocketCfg{ t: "pubsub".to_string(), endpoint: format!("wss://{}/wsps", MASTERSERVER.to_string()), subscription: WSSUBSCRIPTION.to_string(), }),("reqrep".to_string(), WebSocketCfg { t: "reqrep".to_string(), endpoint: format!("wss://{}/wsrr", MASTERSERVER.to_string()), subscription: WSSUBSCRIPTION.to_string(), })]), api: String::from("127.0.0.1:8060") } } pub async fn load(&mut self, server: &String) -> Result<(), ReqError> { self.get_global_config(server).await?; self.get_trustnets(server).await?; self.get_sets(server).await?; self.get_ws_config(server).await?; Ok(()) } async fn get_global_config(&mut self, server: &String) -> Result<(), ReqError> { let resp: Result = httpclient().get(format!("{server}/config")).send().await; let req = match resp { Ok(re) => re, Err(err) => return Err(err), }; let data: HashMap = match req.json::>().await { Ok(res) => { let mut out: HashMap = HashMap::new(); res.into_iter().map(|x| x).for_each(|x| { out.insert(x.key.to_string(), x); }); out } Err(err) => return Err(err), }; let key = "".to_string(); self.api = data .get(&key.to_string()) .unwrap_or(&GlobalConfig { key: "api".to_string(), value: "127.0.0.1:8060".to_string(), }) .value .clone(); Ok(()) } async fn get_trustnets(&mut self, server: &String) -> Result<(), ReqError> { let resp: Result = httpclient() .get(format!("{server}/config/trustlist")) .send() .await; let req = match resp { Ok(re) => re, Err(err) => return Err(err), }; let data: Vec = match req.json::>().await { Ok(res) => res, Err(err) => return Err(err), }; self.trustnets = data; Ok(()) } async fn get_sets(&mut self, server: &String) -> Result<(), ReqError> { let resp: Result = httpclient() .get(format!("{server}/config/sets")) .send() .await; let req = match resp { Ok(re) => re, Err(err) => return Err(err), }; let data: Vec = match req.json::>().await { Ok(res) => res, Err(err) => return Err(err), }; for d in data { self.sets.insert(d.src.clone(), d); } Ok(()) } async fn get_ws_config(&mut self, server: &String) -> Result<(), ReqError> { let resp: Result = httpclient().get(format!("{server}/config/ws")).send().await; let req = match resp { Ok(re) => re, Err(err) => return Err(err), }; let data: HashMap = match req.json::>().await { Ok(res) => { let mut out: HashMap = HashMap::new(); res.into_iter().map(|x| x).for_each(|x| { out.insert(x.t.to_string(), x); }); out } Err(err) => return Err(err), }; self.ws = data; Ok(()) } pub fn build_trustnets(&self) -> Vec { let mut trustnets: Vec = vec![]; for trustnet in &self.trustnets { match trustnet.parse() { Ok(net) => trustnets.push(net), Err(err) => { println!("error parsing {trustnet}, error: {err}"); } }; } trustnets } pub fn bootstrap_event(&self) -> IpEvent { IpEvent { msgtype: String::from("bootstrap"), mode: String::from("ws"), hostname: gethostname(true), ipdata: IpData { ip: "".to_string(), src: "".to_string(), date: "".to_string(), hostname: "".to_string(), }, } } } pub fn httpclient() -> Client { let client = Client::builder() .user_agent(format!( "{}/{}@{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"), GIT_VERSION, gethostname(false) )) .timeout(Duration::seconds(WEB_CLIENT_TIMEOUT).to_std().unwrap()) .build() .unwrap(); client } #[derive(Debug, Deserialize, Serialize, Clone)] pub struct GlobalConfig { pub key: String, pub value: String, } #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Set { pub src: String, pub filename: String, pub regex: String, pub path: String, pub blocktime: i64, pub tryfail: i64, } #[derive(Debug, Deserialize, Serialize, Clone)] pub struct WebSocketCfg { #[serde(rename = "type")] pub t: String, pub endpoint: String, pub subscription: String, } #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Discovery { pub version: String, pub urls: HashMap, } #[derive(Debug, Deserialize, Serialize, Clone)] pub struct URL { pub key: String, pub path: String, } impl PartialEq for Set { fn eq(&self, other: &Self) -> bool { self.src == other.src } } impl Hash for Set { fn hash(&self, state: &mut H) { self.src.hash(state); } } #[cfg(test)] mod test { use super::*; use crate::ip::*; use Context; pub async fn prepare_test_data() -> Context { let mut ctx = Context::new().await; let now: DateTime = Local::now().trunc_subsecs(0); ctx.blocklist = HashMap::new(); for _i in 0..10 { ctx.update_blocklist(&mut IpEvent { msgtype: String::from("add"), mode: String::from("ws"), hostname: String::from("localhost"), ipdata: IpData { ip: "1.1.1.1".to_string(), hostname: "test1".to_string(), date: now.to_rfc3339().to_string(), src: "ssh".to_string(), }, }) .await; } for _ in 0..10 { ctx.update_blocklist(&mut IpEvent { msgtype: String::from("add"), mode: String::from("ws"), hostname: String::from("localhost"), ipdata: IpData { ip: "1.1.1.2".to_string(), hostname: "test2".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), }, }) .await; } ctx.update_blocklist(&mut IpEvent { msgtype: String::from("add"), mode: String::from("ws"), hostname: String::from("localhost"), ipdata: IpData { ip: "1.1.1.3".to_string(), hostname: "testgood".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), }, }) .await; ctx.update_blocklist(&mut IpEvent { msgtype: String::from("add"), mode: String::from("ws"), hostname: String::from("localhost"), ipdata: IpData { ip: "1.1.1.4".to_string(), hostname: "testgood".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), }, }) .await; ctx.update_blocklist(&mut IpEvent { msgtype: String::from("add"), mode: String::from("ws"), hostname: String::from("localhost"), ipdata: IpData { ip: "1.1.1.4".to_string(), hostname: "testgood".to_string(), date: now.to_rfc3339().to_string(), src: "http".to_string(), }, }) .await; let mut ip1 = ctx.blocklist.get_mut(&"1.1.1.1".to_string()).unwrap(); ip1.starttime = DateTime::from(now) - Duration::minutes(61); let mut ip2 = ctx.blocklist.get_mut(&"1.1.1.2".to_string()).unwrap(); ip2.starttime = DateTime::from(now) - Duration::minutes(62); ctx } #[tokio::test] pub async fn test_blocklist_pending() { let ctx = prepare_test_data().await; let pending = ctx.get_blocklist_pending().await; assert_eq!(pending.len(), 4); let ips = ["1.1.1.1", "1.1.1.2", "1.1.1.3", "1.1.1.4"]; for i in ips { let ip = ctx .blocklist .get(&i.to_string()) .unwrap() .ipdata .ip .as_str(); assert_eq!(ip, i); } } #[tokio::test] pub async fn test_blocklist_toblock() { let mut ctx = prepare_test_data().await; ctx.gc_blocklist().await; let toblock = ctx.get_blocklist_toblock().await; assert_eq!(toblock.len(), 2); } #[tokio::test] pub async fn test_blocklist_gc() { let mut ctx = prepare_test_data().await; let after_gc = ctx.gc_blocklist().await; assert_eq!(after_gc.len(), 2); let ips = &["1.1.1.3", "1.1.1.4"]; for ip in ips { let ipstring = ip.to_string(); assert_eq!(ctx.blocklist.get(&ipstring).unwrap().ipdata.ip, ipstring); } } }