275 lines
8.9 KiB
Rust
275 lines
8.9 KiB
Rust
use crate::api::apiserver;
|
|
use crate::config::{Context, GIT_VERSION};
|
|
use crate::fw::{fwblock, fwinit};
|
|
use crate::ip::{filter, IpData, IpEvent};
|
|
use crate::utils::{read_lines, sleep_ms};
|
|
use crate::ws::send_to_ipbl_ws;
|
|
use crate::zmqcom::{send_to_ipbl_zmq, zmqinit};
|
|
|
|
use chrono::prelude::*;
|
|
use chrono::prelude::{DateTime, Local};
|
|
use chrono::Duration;
|
|
use nix::sys::inotify::InotifyEvent;
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
|
use tokio::sync::RwLock;
|
|
|
|
pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
|
|
const BL_CHAN_SIZE: usize = 32;
|
|
const ZMQ_CHAN_SIZE: usize = 64;
|
|
const API_CHAN_SIZE: usize = 64;
|
|
|
|
pub async fn run() {
|
|
let globalctx = Context::new().await;
|
|
let ctxarc = Arc::new(RwLock::new(globalctx));
|
|
let mut ret: Vec<String> = Vec::new();
|
|
|
|
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
|
|
|
|
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
|
|
println!("Launching {}, version {}", PKG_NAME, pkgversion);
|
|
|
|
let (_apitx, mut apirx): (Sender<String>, Receiver<String>) = channel(API_CHAN_SIZE);
|
|
//let tcpsocket = apiinit(&ctx, &apitx).await;
|
|
|
|
let ctxclone = Arc::clone(&ctxarc);
|
|
apiserver(&ctxclone).await.unwrap();
|
|
|
|
// initialize the firewall table
|
|
fwinit();
|
|
let mut fwlen: usize = 0;
|
|
|
|
// initialize zeromq sockets
|
|
let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(ZMQ_CHAN_SIZE);
|
|
let zmqreqsocket = zmqinit(&ctxclone, &ipeventtx).await;
|
|
|
|
let mut blrx = watchfiles(&ctxclone).await;
|
|
|
|
let ctxclone = Arc::clone(&ctxarc);
|
|
tokio::spawn(async move {
|
|
compare_files_changes(&ctxclone, &mut blrx, &ipeventtx).await;
|
|
});
|
|
|
|
let ipevent_bootstrap = IpEvent {
|
|
msgtype: String::from("bootstrap"),
|
|
mode: String::from("zmq"),
|
|
ipdata: IpData {
|
|
ip: "".to_string(),
|
|
src: "".to_string(),
|
|
date: "".to_string(),
|
|
hostname: "".to_string(),
|
|
},
|
|
};
|
|
send_to_ipbl_zmq(&zmqreqsocket, &ipevent_bootstrap, &mut ret).await;
|
|
|
|
loop {
|
|
ret = Vec::new();
|
|
|
|
// wait for logs parse and zmq channel receive
|
|
//let mut received_ip = ipdatarx.recv();
|
|
let ipdata_wait = ipeventrx.recv();
|
|
let apimsg_wait = apirx.recv();
|
|
let force_wait = sleep_ms(200);
|
|
|
|
let ctxclone = Arc::clone(&ctxarc);
|
|
|
|
tokio::select! {
|
|
val = ipdata_wait => {
|
|
let received_ip = val.unwrap();
|
|
|
|
let mut ctx = ctxclone.write().await;
|
|
|
|
if received_ip.msgtype == "bootstrap".to_string() {
|
|
for ip_to_send in ctx.get_blocklist_toblock().await {
|
|
let ipe = IpEvent{
|
|
msgtype: String::from("init"),
|
|
mode: String::from("zmq"),
|
|
ipdata: ip_to_send,
|
|
};
|
|
send_to_ipbl_zmq(&zmqreqsocket, &ipe, &mut ret).await;
|
|
}
|
|
continue
|
|
}
|
|
|
|
// refresh context blocklist
|
|
let filtered_ip = ctx.update_blocklist(&received_ip).await;
|
|
|
|
// send ip list to ws and zmq sockets
|
|
if let Some(ip) = filtered_ip {
|
|
println!("{}",ip);
|
|
if received_ip.msgtype != "init" {
|
|
println!("sending {} to ws and zmq", ip.ip);
|
|
let event = IpEvent{
|
|
msgtype: String::from("add"),
|
|
mode:String::from("zmq"),
|
|
ipdata: ip,
|
|
};
|
|
send_to_ipbl_ws(&ctx, &event, &mut ret).await;
|
|
send_to_ipbl_zmq(&zmqreqsocket, &event, &mut ret).await;
|
|
}
|
|
}
|
|
}
|
|
_val = apimsg_wait => {}
|
|
_val = force_wait => {}
|
|
};
|
|
|
|
let toblock;
|
|
{
|
|
let mut ctx = ctxarc.write().await;
|
|
ctx.gc_blocklist().await;
|
|
toblock = ctx.get_blocklist_toblock().await;
|
|
}
|
|
// apply firewall blocking
|
|
match fwblock(&toblock, &mut ret, &mut fwlen) {
|
|
Ok(_) => {}
|
|
Err(err) => {
|
|
println!("Err: {err}, unable to push firewall rules, use super user")
|
|
}
|
|
};
|
|
|
|
// log lines
|
|
if ret.len() > 0 {
|
|
println!("{ret}", ret = ret.join(", "));
|
|
}
|
|
|
|
{
|
|
let now_cfg_reload = Local::now().trunc_subsecs(0);
|
|
if (now_cfg_reload - last_cfg_reload) > Duration::seconds(5) {
|
|
// reload configuration from the server
|
|
let mut ctx = ctxclone.write().await;
|
|
match ctx.load().await {
|
|
Ok(_) => {
|
|
last_cfg_reload = Local::now().trunc_subsecs(0);
|
|
}
|
|
Err(err) => {
|
|
println!("error loading config: {err}");
|
|
}
|
|
}
|
|
};
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn watchfiles(ctxarc: &Arc<RwLock<Context>>) -> Receiver<FileEvent> {
|
|
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);
|
|
let ctxclone = Arc::clone(ctxarc);
|
|
tokio::spawn(async move {
|
|
loop {
|
|
let events;
|
|
let instance;
|
|
{
|
|
let ctx = ctxclone.read().await;
|
|
instance = ctx.instance.clone();
|
|
}
|
|
|
|
events = instance.read_events().unwrap();
|
|
|
|
for inevent in events {
|
|
let date: DateTime<Local> = Local::now().trunc_subsecs(0);
|
|
bltx.send(FileEvent { inevent, date }).await.unwrap();
|
|
}
|
|
}
|
|
});
|
|
blrx
|
|
}
|
|
|
|
async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, bool) {
|
|
let currentlen = match std::fs::metadata(&path.to_string()) {
|
|
Ok(u) => u.len().clone(),
|
|
Err(_) => 0u64,
|
|
};
|
|
let lastlen = match w.insert(path.to_string(), currentlen) {
|
|
Some(u) => u,
|
|
None => 0u64,
|
|
};
|
|
(lastlen, lastlen != currentlen)
|
|
}
|
|
|
|
async fn compare_files_changes(
|
|
ctxarc: &Arc<RwLock<Context>>,
|
|
inrx: &mut Receiver<FileEvent>,
|
|
ipeventtx: &Sender<IpEvent>,
|
|
) {
|
|
let mut tnets;
|
|
loop {
|
|
let modfiles = inrx.recv().await.unwrap();
|
|
let mut iplist: Vec<IpData> = vec![];
|
|
|
|
let sask;
|
|
let sas;
|
|
{
|
|
let ctx = ctxarc.read().await;
|
|
sas = ctx.clone().sas;
|
|
sask = sas.keys();
|
|
tnets = ctx.cfg.build_trustnets();
|
|
}
|
|
|
|
match modfiles.inevent.name {
|
|
Some(name) => {
|
|
let filename = name.to_str().unwrap();
|
|
for sak in sask {
|
|
let sa = sas.get(sak).unwrap();
|
|
if modfiles.inevent.wd == sa.wd {
|
|
let handle: String;
|
|
if sa.filename.as_str() == "" {
|
|
handle = format!("{}/{}", &sa.fullpath, filename);
|
|
} else if filename.starts_with(sa.filename.as_str()) {
|
|
handle = sa.fullpath.to_owned();
|
|
} else {
|
|
continue;
|
|
}
|
|
|
|
let (filesize, sizechanged);
|
|
{
|
|
let mut ctx = ctxarc.write().await;
|
|
let sa = ctx.sas.get_mut(sak).unwrap();
|
|
(filesize, sizechanged) =
|
|
get_last_file_size(&mut sa.watchedfiles, &handle).await;
|
|
}
|
|
|
|
if !sizechanged {
|
|
continue;
|
|
}
|
|
|
|
match read_lines(&handle, filesize) {
|
|
Some(lines) => {
|
|
filter(
|
|
lines,
|
|
&mut iplist,
|
|
&tnets,
|
|
&sa.regex,
|
|
&sa.set.src,
|
|
&modfiles.date,
|
|
);
|
|
}
|
|
None => {}
|
|
};
|
|
break;
|
|
}
|
|
}
|
|
for ip in iplist {
|
|
let ipevent = IpEvent {
|
|
msgtype: String::from("add"),
|
|
mode: String::from("file"),
|
|
ipdata: ip,
|
|
};
|
|
ipeventtx.send(ipevent).await.unwrap();
|
|
}
|
|
}
|
|
None => {}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct FileEvent {
|
|
pub inevent: InotifyEvent,
|
|
pub date: DateTime<Local>,
|
|
}
|
|
|
|
impl std::fmt::Debug for FileEvent {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
write!(f, "{ie:?}", ie = self.inevent)
|
|
}
|
|
}
|