refactor of modules

This commit is contained in:
Paul 2022-12-30 20:18:15 +01:00
parent bbce4547cf
commit ff99fce62b
9 changed files with 119 additions and 124 deletions

View File

@ -1,5 +1,5 @@
use crate::ip::{BlockIpData, IpData}; use crate::ip::{BlockIpData, IpData};
use crate::utils::*; use crate::utils::{gethostname, sleep_s};
use chrono::prelude::*; use chrono::prelude::*;
use chrono::Duration; use chrono::Duration;
@ -90,7 +90,7 @@ impl Context {
} }
Err(err) => { Err(err) => {
println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs"); println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs");
sleep(CONFIG_RETRY); sleep_s(CONFIG_RETRY);
} }
} }
} }
@ -99,7 +99,7 @@ impl Context {
pub fn argparse() -> ArgMatches { pub fn argparse() -> ArgMatches {
Command::new(env!("CARGO_PKG_NAME")) Command::new(env!("CARGO_PKG_NAME"))
.version(format!("{}/{}", env!("CARGO_PKG_VERSION"), GIT_VERSION).as_str()) .version(format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION).as_str())
.author(env!("CARGO_PKG_AUTHORS")) .author(env!("CARGO_PKG_AUTHORS"))
.about(env!("CARGO_PKG_DESCRIPTION")) .about(env!("CARGO_PKG_DESCRIPTION"))
.arg( .arg(

View File

@ -36,6 +36,9 @@ pub fn block(
// add chain // add chain
batch.add(&chain, nftnl::MsgType::Add); batch.add(&chain, nftnl::MsgType::Add);
let rule = Rule::new(&chain);
batch.add(&rule, nftnl::MsgType::Del);
let mut rule = Rule::new(&chain); let mut rule = Rule::new(&chain);
rule.add_expr(&nft_expr!(ct state)); rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32)); rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));

View File

@ -146,7 +146,7 @@ pub fn filter(
trustnets: &Vec<IpNet>, trustnets: &Vec<IpNet>,
regex: &Regex, regex: &Regex,
src: &String, src: &String,
lastprocess: &DateTime<Local>, last: &DateTime<Local>,
) -> isize { ) -> isize {
let mut ips = 0; let mut ips = 0;
let hostname = gethostname(true); let hostname = gethostname(true);
@ -160,15 +160,14 @@ pub fn filter(
s_ipaddr = sv4.get(0).unwrap().as_str().to_string(); s_ipaddr = sv4.get(0).unwrap().as_str().to_string();
} }
None => { None => {
continue; match R_IPV6.captures(l.as_str()) {
/*match R_IPV6.captures(l.as_str()) {
Some(sv6) => { Some(sv6) => {
s_ipaddr = sv6.get(0).unwrap().as_str().to_string(); s_ipaddr = sv6.get(0).unwrap().as_str().to_string();
} }
None => { None => {
continue; continue;
} }
};*/ };
} }
}; };
@ -176,7 +175,7 @@ pub fn filter(
match R_DATE.captures(l.as_str()) { match R_DATE.captures(l.as_str()) {
Some(sdt) => { Some(sdt) => {
s_date = parse_date(sdt); s_date = parse_date(sdt);
if &s_date < lastprocess { if &s_date < last {
continue; continue;
} }
} }

View File

@ -1,7 +1,13 @@
use super::*; use crate::config::{Context, GIT_VERSION};
use crate::fw;
use crate::ip::{filter, push_ip, IpData};
use crate::utils::{gethostname, read_lines, sleep_s};
use crate::zmqcom::*;
use chrono::prelude::*; use chrono::prelude::*;
use chrono::prelude::{DateTime, Local};
use chrono::Duration; use chrono::Duration;
use nix::sys::inotify::InotifyEvent;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
@ -10,17 +16,18 @@ use tokio::sync::Mutex;
const BL_CHAN_SIZE: usize = 32; const BL_CHAN_SIZE: usize = 32;
const ZMQ_CHAN_SIZE: usize = 64; const ZMQ_CHAN_SIZE: usize = 64;
pub async fn process(ctx: &Arc<Mutex<Context>>) { pub async fn run() {
let ctx = Arc::new(Mutex::new(Context::new().await));
println!( println!(
"Launching {} version {}", "Launching {}, version {}",
env!("CARGO_PKG_NAME"), env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION") format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION)
); );
let (ipdatatx, mut ipdatarx): (Sender<IpData>, Receiver<IpData>) = channel(ZMQ_CHAN_SIZE); let (ipdatatx, mut ipdatarx): (Sender<IpData>, Receiver<IpData>) = channel(ZMQ_CHAN_SIZE);
// initialize the firewall table // initialize the firewall table
firewall::init(&env!("CARGO_PKG_NAME").to_string()); fw::init(&env!("CARGO_PKG_NAME").to_string());
let mut fwlen: usize = 0; let mut fwlen: usize = 0;
// initialize zeromq sockets // initialize zeromq sockets
@ -85,7 +92,7 @@ pub async fn process(ctx: &Arc<Mutex<Context>>) {
} }
// apply firewall blocking // apply firewall blocking
firewall::block( fw::block(
&env!("CARGO_PKG_NAME").to_string(), &env!("CARGO_PKG_NAME").to_string(),
&ctx.get_blocklist_toblock().await, &ctx.get_blocklist_toblock().await,
&mut ret, &mut ret,
@ -204,3 +211,90 @@ async fn compare_files_changes(
} }
} }
} }
pub struct FileEvent {
pub inevent: InotifyEvent,
pub date: DateTime<Local>,
}
impl std::fmt::Debug for FileEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{ie:?}", ie = self.inevent)
}
}
async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &mut IpData) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match reqsocket.send(&msg, 0) {
Ok(_) => {}
Err(e) => {
println!("{e:?}")
}
};
match reqsocket.recv_string(0) {
Ok(o) => match o {
Ok(_) => {}
Err(ee) => {
println!("{ee:?}")
}
},
Err(e) => {
println!("{e:?}")
}
};
}
async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec<String>) {
ret.push(format!("host: {hostname}", hostname = ctx.hostname));
loop {
match push_ip(&ctx, &ip, ret).await {
Ok(_) => {
break;
}
Err(err) => {
println!("{err}");
sleep_s(1);
}
};
}
}
async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
let ctx = ctx.lock().await;
let prefix = format!(
"{sub} ",
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
);
socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription");
drop(ctx);
tokio::spawn(async move {
loop {
let msgs: Option<String> = match socket.recv_string(0) {
Ok(s) => match s {
Ok(ss) => Some(ss),
Err(e) => {
println!("{e:?}");
None
}
},
Err(e) => {
println!("{e:?}");
None
}
};
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
let tosend: IpData = serde_json::from_str(msg).unwrap();
if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() {
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
});
}

View File

@ -1,100 +0,0 @@
pub mod inc;
use crate::config::*;
use crate::firewall;
use crate::ip::*;
use crate::utils::*;
use crate::zmqcom::*;
use chrono::prelude::{DateTime, Local};
use nix::sys::inotify::InotifyEvent;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
pub struct FileEvent {
pub inevent: InotifyEvent,
pub date: DateTime<Local>,
}
impl std::fmt::Debug for FileEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{ie:?}", ie = self.inevent)
}
}
async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &mut IpData) {
let msg = format!("{value}", value = serde_json::to_string(&ip).unwrap());
match reqsocket.send(&msg, 0) {
Ok(_) => {}
Err(e) => {
println!("{e:?}")
}
};
match reqsocket.recv_string(0) {
Ok(o) => match o {
Ok(_) => {}
Err(ee) => {
println!("{ee:?}")
}
},
Err(e) => {
println!("{e:?}")
}
};
}
async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec<String>) {
ret.push(format!("host: {hostname}", hostname = ctx.hostname));
loop {
match push_ip(&ctx, &ip, ret).await {
Ok(_) => {
break;
}
Err(err) => {
println!("{err}");
sleep(1);
}
};
}
}
async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
let ctx = ctx.lock().await;
let prefix = format!(
"{sub} ",
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
);
socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription");
drop(ctx);
tokio::spawn(async move {
loop {
let msgs: Option<String> = match socket.recv_string(0) {
Ok(s) => match s {
Ok(ss) => Some(ss),
Err(e) => {
println!("{e:?}");
None
}
},
Err(e) => {
println!("{e:?}");
None
}
};
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
let tosend: IpData = serde_json::from_str(msg).unwrap();
if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() {
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
});
}

View File

@ -1,17 +1,12 @@
mod config; mod config;
mod firewall; mod fw;
mod ip; mod ip;
mod ipblc; mod ipblc;
mod utils; mod utils;
mod zmqcom; mod zmqcom;
use config::Context;
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main] #[tokio::main]
pub async fn main() { pub async fn main() {
// Create a new context // Create a new context
let ctx = Arc::new(Mutex::new(Context::new().await)); ipblc::run().await;
ipblc::inc::process(&ctx).await;
} }

View File

@ -33,8 +33,12 @@ pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
list.len() list.len()
} }
pub fn sleep(seconds: u64) { pub fn sleep_ms(ms: u64) {
std::thread::sleep(Duration::from_secs(seconds)); std::thread::sleep(Duration::from_millis(ms));
}
pub fn sleep_s(s: u64) {
std::thread::sleep(Duration::from_secs(s));
} }
pub fn gethostname(show_fqdn: bool) -> String { pub fn gethostname(show_fqdn: bool) -> String {

View File

@ -7,7 +7,7 @@ const ZMQPROTO: &str = "tcp";
pub async fn zconnect(zmqcfg: &ZMQ, zmqtype: zmq::SocketType) -> Result<zmq::Socket, zmq::Error> { pub async fn zconnect(zmqcfg: &ZMQ, zmqtype: zmq::SocketType) -> Result<zmq::Socket, zmq::Error> {
let zctx = zmq::Context::new(); let zctx = zmq::Context::new();
let zmqhost = &zmqcfg.hostname; let zmqhost = &zmqcfg.hostname;
let zmqport = zmqcfg.port; let zmqport = &zmqcfg.port;
let socket = zctx.socket(zmqtype).unwrap(); let socket = zctx.socket(zmqtype).unwrap();
let connectstring = format!("{ZMQPROTO}://{zmqhost}:{zmqport}"); let connectstring = format!("{ZMQPROTO}://{zmqhost}:{zmqport}");
socket.connect(&connectstring.as_str())?; socket.connect(&connectstring.as_str())?;