ipblc/src/ipblc.rs

252 lines
8.0 KiB
Rust
Raw Normal View History

2023-01-08 14:09:13 +01:00
use crate::api::apiserver;
2022-12-30 20:18:15 +01:00
use crate::config::{Context, GIT_VERSION};
2023-01-08 21:16:06 +01:00
use crate::fw::{fwblock, fwinit};
2023-01-08 14:09:13 +01:00
use crate::ip::{filter, IpData};
use crate::utils::read_lines;
use crate::ws::send_to_ipbl_ws;
use crate::zmqcom::{send_to_ipbl_zmq, zmqinit};
use chrono::prelude::*;
2022-12-30 20:18:15 +01:00
use chrono::prelude::{DateTime, Local};
2022-07-01 15:51:41 +02:00
use chrono::Duration;
2022-12-30 20:18:15 +01:00
use nix::sys::inotify::InotifyEvent;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender};
2023-01-10 18:00:40 +01:00
use tokio::sync::RwLock;
2023-01-08 21:16:06 +01:00
pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
const BL_CHAN_SIZE: usize = 32;
const ZMQ_CHAN_SIZE: usize = 64;
2023-01-08 14:09:13 +01:00
const API_CHAN_SIZE: usize = 64;
2022-12-30 20:18:15 +01:00
pub async fn run() {
2023-01-10 18:00:40 +01:00
let ctxarc = Arc::new(RwLock::new(Context::new().await));
2023-01-08 21:16:06 +01:00
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
println!("Launching {}, version {}", PKG_NAME, pkgversion);
let (_apitx, mut apirx): (Sender<String>, Receiver<String>) = channel(API_CHAN_SIZE);
2023-01-08 14:09:13 +01:00
//let tcpsocket = apiinit(&ctx, &apitx).await;
2023-01-08 21:16:06 +01:00
let ctxclone = Arc::clone(&ctxarc);
apiserver(&ctxclone).await.unwrap();
// initialize the firewall table
2023-01-08 21:16:06 +01:00
fwinit();
2022-09-17 22:31:30 +02:00
let mut fwlen: usize = 0;
// initialize zeromq sockets
2023-01-08 14:09:13 +01:00
let (ipdatatx, mut ipdatarx): (Sender<IpData>, Receiver<IpData>) = channel(ZMQ_CHAN_SIZE);
2023-01-08 21:16:06 +01:00
let zmqreqsocket = zmqinit(&ctxclone, &ipdatatx).await;
2023-01-08 21:16:06 +01:00
let mut blrx = watchfiles(&ctxclone).await;
2023-01-08 21:16:06 +01:00
let ctxclone = Arc::clone(&ctxarc);
tokio::spawn(async move {
2023-01-08 21:16:06 +01:00
compare_files_changes(&ctxclone, &mut blrx, &ipdatatx).await;
});
2022-09-17 21:24:36 +02:00
let mut ip_init = IpData {
ip: "".to_string(),
src: "".to_string(),
date: "".to_string(),
hostname: "".to_string(),
mode: "init".to_string(),
};
2023-01-08 14:09:13 +01:00
send_to_ipbl_zmq(&zmqreqsocket, &mut ip_init).await;
loop {
let mut ret: Vec<String> = Vec::new();
// wait for logs parse and zmq channel receive
2023-01-08 14:09:13 +01:00
//let mut received_ip = ipdatarx.recv();
2023-01-08 21:16:06 +01:00
let ipdata_wait = ipdatarx.recv();
let apimsg_wait = apirx.recv();
2023-01-08 21:16:06 +01:00
let ctxclone = Arc::clone(&ctxarc);
2023-01-08 14:09:13 +01:00
tokio::select! {
2023-01-08 21:16:06 +01:00
val = ipdata_wait => {
2023-01-15 15:32:41 +01:00
let received_ip = val.unwrap();
2023-01-08 21:16:06 +01:00
2023-01-10 18:00:40 +01:00
let mut ctx = ctxclone.write().await;
2023-01-08 14:09:13 +01:00
if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() {
for ip_to_send in &mut ctx.get_blocklist_toblock().await {
ip_to_send.mode = "init".to_string();
send_to_ipbl_zmq(&zmqreqsocket, ip_to_send).await;
}
continue;
}
2023-01-08 14:09:13 +01:00
// refresh context blocklist
2023-01-15 15:32:41 +01:00
let filtered_ip = ctx.update_blocklist(&received_ip).await;
2023-01-08 14:09:13 +01:00
// send ip list to ws and zmq sockets
2023-01-15 15:32:41 +01:00
if let Some(ip) = filtered_ip {
println!("sending {} to ws and zmq", ip.ip);
send_to_ipbl_ws(&ctx, &ip, &mut ret).await;
send_to_ipbl_zmq(&zmqreqsocket, &ip).await;
2022-07-01 15:51:41 +02:00
}
2023-01-08 14:09:13 +01:00
}
2023-01-08 21:16:06 +01:00
_val = apimsg_wait => {
2023-01-08 14:09:13 +01:00
}
};
2023-01-08 21:16:06 +01:00
let toblock;
{
2023-01-10 18:00:40 +01:00
let mut ctx = ctxarc.write().await;
2023-01-08 21:16:06 +01:00
ctx.gc_blocklist().await;
toblock = ctx.get_blocklist_toblock().await;
}
// apply firewall blocking
match fwblock(&toblock, &mut ret, &mut fwlen) {
Ok(_) => {}
Err(err) => {
println!("Err: {err}, unable to push firewall rules, use super user")
}
};
2023-01-08 21:16:06 +01:00
// log lines
if ret.len() > 0 {
println!("{ret}", ret = ret.join(", "));
}
{
let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - last_cfg_reload) > Duration::seconds(5) {
// reload configuration from the server
2023-01-10 18:00:40 +01:00
let mut ctx = ctxclone.write().await;
2023-01-08 21:16:06 +01:00
match ctx.load().await {
Ok(_) => {
last_cfg_reload = Local::now().trunc_subsecs(0);
}
Err(err) => {
println!("error loading config: {err}");
}
}
};
}
}
}
2023-01-10 18:00:40 +01:00
async fn watchfiles(ctxarc: &Arc<RwLock<Context>>) -> Receiver<FileEvent> {
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);
2023-01-08 21:16:06 +01:00
let ctxclone = Arc::clone(ctxarc);
tokio::spawn(async move {
loop {
2023-01-08 21:16:06 +01:00
let events;
let instance;
{
2023-01-10 18:00:40 +01:00
let ctx = ctxclone.read().await;
2023-01-08 21:16:06 +01:00
instance = ctx.instance.clone();
}
2023-01-08 21:16:06 +01:00
events = instance.read_events().unwrap();
2022-09-21 21:03:01 +02:00
for inevent in events {
let date: DateTime<Local> = Local::now().trunc_subsecs(0);
2022-09-21 21:03:01 +02:00
bltx.send(FileEvent { inevent, date }).await.unwrap();
}
}
});
blrx
}
2022-09-23 13:17:20 +02:00
async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, bool) {
let currentlen = match std::fs::metadata(&path.to_string()) {
Ok(u) => u.len().clone(),
2022-09-23 13:17:20 +02:00
Err(_) => 0u64,
};
2022-09-21 21:03:01 +02:00
let lastlen = match w.insert(path.to_string(), currentlen) {
Some(u) => u,
2022-09-23 13:17:20 +02:00
None => 0u64,
};
2022-09-23 13:17:20 +02:00
(lastlen, lastlen != currentlen)
}
async fn compare_files_changes(
2023-01-10 18:00:40 +01:00
ctxarc: &Arc<RwLock<Context>>,
2022-09-21 21:03:01 +02:00
inrx: &mut Receiver<FileEvent>,
ipdatatx: &Sender<IpData>,
) {
2022-09-21 21:03:01 +02:00
let mut tnets;
loop {
2022-09-21 21:03:01 +02:00
let modfiles = inrx.recv().await.unwrap();
let mut iplist: Vec<IpData> = vec![];
2023-01-08 21:16:06 +01:00
let sask;
let sas;
{
2023-01-10 18:00:40 +01:00
let ctx = ctxarc.read().await;
2023-01-08 21:16:06 +01:00
sas = ctx.clone().sas;
sask = sas.keys();
tnets = ctx.cfg.build_trustnets();
}
2022-09-21 21:03:01 +02:00
match modfiles.inevent.name {
Some(name) => {
2022-09-21 21:03:01 +02:00
let filename = name.to_str().unwrap();
2023-01-08 21:16:06 +01:00
for sak in sask {
let sa = sas.get(sak).unwrap();
2022-09-21 21:03:01 +02:00
if modfiles.inevent.wd == sa.wd {
let handle: String;
if sa.filename.as_str() == "" {
2022-09-21 21:03:01 +02:00
handle = format!("{}/{}", &sa.fullpath, filename);
} else if filename.starts_with(sa.filename.as_str()) {
handle = sa.fullpath.to_owned();
} else {
continue;
}
2023-01-08 21:16:06 +01:00
let (filesize, sizechanged);
{
2023-01-10 18:00:40 +01:00
let mut ctx = ctxarc.write().await;
2023-01-08 21:16:06 +01:00
let sa = ctx.sas.get_mut(sak).unwrap();
(filesize, sizechanged) =
get_last_file_size(&mut sa.watchedfiles, &handle).await;
}
2022-09-23 13:17:20 +02:00
if !sizechanged {
continue;
}
2022-09-21 21:03:01 +02:00
match read_lines(&handle, filesize) {
Some(lines) => {
filter(
lines,
2022-09-21 21:03:01 +02:00
&mut iplist,
&tnets,
&sa.regex,
2022-07-01 15:51:41 +02:00
&sa.set.src,
2022-09-21 21:03:01 +02:00
&modfiles.date,
);
}
None => {}
};
break;
}
}
2022-09-21 21:03:01 +02:00
for ip in iplist {
ipdatatx.send(ip).await.unwrap();
}
}
None => {}
}
}
}
2022-12-30 20:18:15 +01:00
pub struct FileEvent {
pub inevent: InotifyEvent,
pub date: DateTime<Local>,
}
impl std::fmt::Debug for FileEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{ie:?}", ie = self.inevent)
}
}