diff --git a/src/ipblc/full.rs b/old/full.rs similarity index 100% rename from src/ipblc/full.rs rename to old/full.rs diff --git a/src/config/mod.rs b/src/config.rs similarity index 99% rename from src/config/mod.rs rename to src/config.rs index 3dc0275..adc7d31 100644 --- a/src/config/mod.rs +++ b/src/config.rs @@ -1,5 +1,5 @@ use crate::ip::{BlockIpData, IpData}; -use crate::utils::*; +use crate::utils::{gethostname, sleep_s}; use chrono::prelude::*; use chrono::Duration; @@ -90,7 +90,7 @@ impl Context { } Err(err) => { println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs"); - sleep(CONFIG_RETRY); + sleep_s(CONFIG_RETRY); } } } @@ -99,7 +99,7 @@ impl Context { pub fn argparse() -> ArgMatches { Command::new(env!("CARGO_PKG_NAME")) - .version(format!("{}/{}", env!("CARGO_PKG_VERSION"), GIT_VERSION).as_str()) + .version(format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION).as_str()) .author(env!("CARGO_PKG_AUTHORS")) .about(env!("CARGO_PKG_DESCRIPTION")) .arg( diff --git a/src/firewall/mod.rs b/src/fw.rs similarity index 97% rename from src/firewall/mod.rs rename to src/fw.rs index 947e0a3..983bf5e 100644 --- a/src/firewall/mod.rs +++ b/src/fw.rs @@ -36,6 +36,9 @@ pub fn block( // add chain batch.add(&chain, nftnl::MsgType::Add); + let rule = Rule::new(&chain); + batch.add(&rule, nftnl::MsgType::Del); + let mut rule = Rule::new(&chain); rule.add_expr(&nft_expr!(ct state)); rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32)); diff --git a/src/ip.rs b/src/ip.rs index af5fc76..e8b1594 100644 --- a/src/ip.rs +++ b/src/ip.rs @@ -146,7 +146,7 @@ pub fn filter( trustnets: &Vec, regex: &Regex, src: &String, - lastprocess: &DateTime, + last: &DateTime, ) -> isize { let mut ips = 0; let hostname = gethostname(true); @@ -160,15 +160,14 @@ pub fn filter( s_ipaddr = sv4.get(0).unwrap().as_str().to_string(); } None => { - continue; - /*match R_IPV6.captures(l.as_str()) { + match R_IPV6.captures(l.as_str()) { Some(sv6) => { s_ipaddr = sv6.get(0).unwrap().as_str().to_string(); } None => { continue; } - };*/ + }; } }; @@ -176,7 +175,7 @@ pub fn filter( match R_DATE.captures(l.as_str()) { Some(sdt) => { s_date = parse_date(sdt); - if &s_date < lastprocess { + if &s_date < last { continue; } } diff --git a/src/ipblc/inc.rs b/src/ipblc.rs similarity index 67% rename from src/ipblc/inc.rs rename to src/ipblc.rs index 88a8578..404bbe5 100644 --- a/src/ipblc/inc.rs +++ b/src/ipblc.rs @@ -1,7 +1,13 @@ -use super::*; +use crate::config::{Context, GIT_VERSION}; +use crate::fw; +use crate::ip::{filter, push_ip, IpData}; +use crate::utils::{gethostname, read_lines, sleep_s}; +use crate::zmqcom::*; use chrono::prelude::*; +use chrono::prelude::{DateTime, Local}; use chrono::Duration; +use nix::sys::inotify::InotifyEvent; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -10,17 +16,18 @@ use tokio::sync::Mutex; const BL_CHAN_SIZE: usize = 32; const ZMQ_CHAN_SIZE: usize = 64; -pub async fn process(ctx: &Arc>) { +pub async fn run() { + let ctx = Arc::new(Mutex::new(Context::new().await)); println!( - "Launching {} version {}", + "Launching {}, version {}", env!("CARGO_PKG_NAME"), - env!("CARGO_PKG_VERSION") + format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION) ); let (ipdatatx, mut ipdatarx): (Sender, Receiver) = channel(ZMQ_CHAN_SIZE); // initialize the firewall table - firewall::init(&env!("CARGO_PKG_NAME").to_string()); + fw::init(&env!("CARGO_PKG_NAME").to_string()); let mut fwlen: usize = 0; // initialize zeromq sockets @@ -85,7 +92,7 @@ pub async fn process(ctx: &Arc>) { } // apply firewall blocking - firewall::block( + fw::block( &env!("CARGO_PKG_NAME").to_string(), &ctx.get_blocklist_toblock().await, &mut ret, @@ -204,3 +211,90 @@ async fn compare_files_changes( } } } + +pub struct FileEvent { + pub inevent: InotifyEvent, + pub date: DateTime, +} + +impl std::fmt::Debug for FileEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{ie:?}", ie = self.inevent) + } +} + +async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &mut IpData) { + let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); + match reqsocket.send(&msg, 0) { + Ok(_) => {} + Err(e) => { + println!("{e:?}") + } + }; + match reqsocket.recv_string(0) { + Ok(o) => match o { + Ok(_) => {} + Err(ee) => { + println!("{ee:?}") + } + }, + Err(e) => { + println!("{e:?}") + } + }; +} + +async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec) { + ret.push(format!("host: {hostname}", hostname = ctx.hostname)); + loop { + match push_ip(&ctx, &ip, ret).await { + Ok(_) => { + break; + } + Err(err) => { + println!("{err}"); + sleep_s(1); + } + }; + } +} + +async fn listenpubsub(ctx: &Arc>, txpubsub: Sender, socket: zmq::Socket) { + let ctx = ctx.lock().await; + let prefix = format!( + "{sub} ", + sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription + ); + socket + .set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes()) + .expect("failed setting subscription"); + drop(ctx); + + tokio::spawn(async move { + loop { + let msgs: Option = match socket.recv_string(0) { + Ok(s) => match s { + Ok(ss) => Some(ss), + Err(e) => { + println!("{e:?}"); + None + } + }, + Err(e) => { + println!("{e:?}"); + None + } + }; + match msgs { + Some(ss) => { + let msg = ss.strip_prefix(prefix.as_str()).unwrap(); + let tosend: IpData = serde_json::from_str(msg).unwrap(); + if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() { + txpubsub.send(tosend).await.unwrap(); + } + } + None => {} + }; + } + }); +} diff --git a/src/ipblc/mod.rs b/src/ipblc/mod.rs deleted file mode 100644 index 973909c..0000000 --- a/src/ipblc/mod.rs +++ /dev/null @@ -1,100 +0,0 @@ -pub mod inc; - -use crate::config::*; -use crate::firewall; -use crate::ip::*; -use crate::utils::*; -use crate::zmqcom::*; - -use chrono::prelude::{DateTime, Local}; -use nix::sys::inotify::InotifyEvent; -use std::sync::Arc; -use tokio::sync::mpsc::Sender; -use tokio::sync::Mutex; - -pub struct FileEvent { - pub inevent: InotifyEvent, - pub date: DateTime, -} - -impl std::fmt::Debug for FileEvent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{ie:?}", ie = self.inevent) - } -} - -async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &mut IpData) { - let msg = format!("{value}", value = serde_json::to_string(&ip).unwrap()); - match reqsocket.send(&msg, 0) { - Ok(_) => {} - Err(e) => { - println!("{e:?}") - } - }; - match reqsocket.recv_string(0) { - Ok(o) => match o { - Ok(_) => {} - Err(ee) => { - println!("{ee:?}") - } - }, - Err(e) => { - println!("{e:?}") - } - }; -} - -async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec) { - ret.push(format!("host: {hostname}", hostname = ctx.hostname)); - loop { - match push_ip(&ctx, &ip, ret).await { - Ok(_) => { - break; - } - Err(err) => { - println!("{err}"); - sleep(1); - } - }; - } -} - -async fn listenpubsub(ctx: &Arc>, txpubsub: Sender, socket: zmq::Socket) { - let ctx = ctx.lock().await; - let prefix = format!( - "{sub} ", - sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription - ); - socket - .set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes()) - .expect("failed setting subscription"); - drop(ctx); - - tokio::spawn(async move { - loop { - let msgs: Option = match socket.recv_string(0) { - Ok(s) => match s { - Ok(ss) => Some(ss), - Err(e) => { - println!("{e:?}"); - None - } - }, - Err(e) => { - println!("{e:?}"); - None - } - }; - match msgs { - Some(ss) => { - let msg = ss.strip_prefix(prefix.as_str()).unwrap(); - let tosend: IpData = serde_json::from_str(msg).unwrap(); - if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() { - txpubsub.send(tosend).await.unwrap(); - } - } - None => {} - }; - } - }); -} diff --git a/src/main.rs b/src/main.rs index d3ae296..30c6b8e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,12 @@ mod config; -mod firewall; +mod fw; mod ip; mod ipblc; mod utils; mod zmqcom; -use config::Context; -use std::sync::Arc; -use tokio::sync::Mutex; - #[tokio::main] pub async fn main() { // Create a new context - let ctx = Arc::new(Mutex::new(Context::new().await)); - ipblc::inc::process(&ctx).await; + ipblc::run().await; } diff --git a/src/utils.rs b/src/utils.rs index 92f8f6a..a27c9cd 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -33,8 +33,12 @@ pub fn _dedup(list: &mut Vec) -> usize { list.len() } -pub fn sleep(seconds: u64) { - std::thread::sleep(Duration::from_secs(seconds)); +pub fn sleep_ms(ms: u64) { + std::thread::sleep(Duration::from_millis(ms)); +} + +pub fn sleep_s(s: u64) { + std::thread::sleep(Duration::from_secs(s)); } pub fn gethostname(show_fqdn: bool) -> String { diff --git a/src/zmqcom.rs b/src/zmqcom.rs index 663da78..108b7cc 100644 --- a/src/zmqcom.rs +++ b/src/zmqcom.rs @@ -7,7 +7,7 @@ const ZMQPROTO: &str = "tcp"; pub async fn zconnect(zmqcfg: &ZMQ, zmqtype: zmq::SocketType) -> Result { let zctx = zmq::Context::new(); let zmqhost = &zmqcfg.hostname; - let zmqport = zmqcfg.port; + let zmqport = &zmqcfg.port; let socket = zctx.socket(zmqtype).unwrap(); let connectstring = format!("{ZMQPROTO}://{zmqhost}:{zmqport}"); socket.connect(&connectstring.as_str())?;