isolation of zmq received ips
This commit is contained in:
parent
ebd969f6f8
commit
23353211ae
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,5 +1,6 @@
|
|||||||
*.json
|
*.json
|
||||||
*.swp
|
*.swp
|
||||||
|
/*diff*
|
||||||
/*.gz
|
/*.gz
|
||||||
/perf*
|
/perf*
|
||||||
/sample
|
/sample
|
||||||
|
@ -14,6 +14,7 @@ use std::path::Path;
|
|||||||
|
|
||||||
const MASTERSERVER: &str = "ipbl.paulbsd.com";
|
const MASTERSERVER: &str = "ipbl.paulbsd.com";
|
||||||
const ZMQSUBSCRIPTION: &str = "ipbl";
|
const ZMQSUBSCRIPTION: &str = "ipbl";
|
||||||
|
const CONFIG_RETRY: u64 = 10;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Context {
|
pub struct Context {
|
||||||
@ -87,9 +88,8 @@ impl Context {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let retry = 10;
|
println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs");
|
||||||
println!("error loading config: {err}, retrying in {retry} secs");
|
std::thread::sleep(std::time::Duration::from_secs(CONFIG_RETRY));
|
||||||
std::thread::sleep(std::time::Duration::from_secs(retry));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -163,7 +163,7 @@ impl Context {
|
|||||||
if block.tryfail >= set.tryfail {
|
if block.tryfail >= set.tryfail {
|
||||||
res.push(block.ipdata.clone());
|
res.push(block.ipdata.clone());
|
||||||
if block.tryfail == set.tryfail {
|
if block.tryfail == set.tryfail {
|
||||||
block.starttime = now;
|
block.starttime = DateTime::from(now);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -176,11 +176,12 @@ impl Context {
|
|||||||
a.tryfail += 1;
|
a.tryfail += 1;
|
||||||
return a.ipdata.clone();
|
return a.ipdata.clone();
|
||||||
} else {
|
} else {
|
||||||
let now = Local::now().trunc_subsecs(0);
|
|
||||||
let mut tryfail = 0;
|
let mut tryfail = 0;
|
||||||
if ipdata.mode == "zmq".to_string() {
|
if ipdata.mode == "zmq".to_string() {
|
||||||
tryfail = 100;
|
tryfail = 100;
|
||||||
}
|
}
|
||||||
|
let starttime: DateTime<FixedOffset> =
|
||||||
|
DateTime::parse_from_rfc2822(ipdata.date.as_str()).unwrap();
|
||||||
match self.cfg.sets.get(&ipdata.src) {
|
match self.cfg.sets.get(&ipdata.src) {
|
||||||
Some(set) => {
|
Some(set) => {
|
||||||
self.blocklist.insert(
|
self.blocklist.insert(
|
||||||
@ -188,7 +189,7 @@ impl Context {
|
|||||||
BlockIpData {
|
BlockIpData {
|
||||||
ipdata: ipdata.clone(),
|
ipdata: ipdata.clone(),
|
||||||
tryfail,
|
tryfail,
|
||||||
starttime: now,
|
starttime,
|
||||||
blocktime: set.blocktime,
|
blocktime: set.blocktime,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@ -243,7 +244,7 @@ impl Context {
|
|||||||
src.clone(),
|
src.clone(),
|
||||||
SetMap {
|
SetMap {
|
||||||
filename: set.filename.clone(),
|
filename: set.filename.clone(),
|
||||||
fullpath: fullpath,
|
fullpath,
|
||||||
set: set.clone(),
|
set: set.clone(),
|
||||||
regex: Regex::new(set.regex.as_str()).unwrap(),
|
regex: Regex::new(set.regex.as_str()).unwrap(),
|
||||||
wd: res,
|
wd: res,
|
||||||
@ -502,10 +503,10 @@ mod test {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
let mut ip1 = ctx.blocklist.get_mut(&"1.1.1.1".to_string()).unwrap();
|
let mut ip1 = ctx.blocklist.get_mut(&"1.1.1.1".to_string()).unwrap();
|
||||||
ip1.starttime = now - Duration::minutes(61);
|
ip1.starttime = DateTime::from(now) - Duration::minutes(61);
|
||||||
|
|
||||||
let mut ip2 = ctx.blocklist.get_mut(&"1.1.1.2".to_string()).unwrap();
|
let mut ip2 = ctx.blocklist.get_mut(&"1.1.1.2".to_string()).unwrap();
|
||||||
ip2.starttime = now - Duration::minutes(62);
|
ip2.starttime = DateTime::from(now) - Duration::minutes(62);
|
||||||
ctx
|
ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,7 +32,7 @@ pub struct BlockIpData {
|
|||||||
pub ipdata: IpData,
|
pub ipdata: IpData,
|
||||||
pub tryfail: i64,
|
pub tryfail: i64,
|
||||||
pub blocktime: i64,
|
pub blocktime: i64,
|
||||||
pub starttime: DateTime<Local>,
|
pub starttime: DateTime<FixedOffset>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PartialEq for IpData {
|
impl PartialEq for IpData {
|
||||||
|
@ -45,42 +45,42 @@ pub async fn process(ctx: &Arc<Mutex<Context>>) {
|
|||||||
compare_files_changes(&ctxarc, &mut blrx, &ipdatatx).await;
|
compare_files_changes(&ctxarc, &mut blrx, &ipdatatx).await;
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut ip = IpData {
|
let mut ip_init = IpData {
|
||||||
ip: "".to_string(),
|
ip: "".to_string(),
|
||||||
src: "".to_string(),
|
src: "".to_string(),
|
||||||
date: "".to_string(),
|
date: "".to_string(),
|
||||||
hostname: "".to_string(),
|
hostname: "".to_string(),
|
||||||
mode: "init".to_string(),
|
mode: "init".to_string(),
|
||||||
};
|
};
|
||||||
send_to_ipbl_zmq(&reqsocket, &mut ip).await;
|
send_to_ipbl_zmq(&reqsocket, &mut ip_init).await;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut ret: Vec<String> = Vec::new();
|
let mut ret: Vec<String> = Vec::new();
|
||||||
let begin: DateTime<Local> = Local::now().trunc_subsecs(0);
|
let begin: DateTime<Local> = Local::now().trunc_subsecs(0);
|
||||||
|
|
||||||
// wait for logs parse and zmq channel receive
|
// wait for logs parse and zmq channel receive
|
||||||
let mut ip = ipdatarx.recv().await.unwrap();
|
let mut received_ip = ipdatarx.recv().await.unwrap();
|
||||||
|
|
||||||
// lock the context mutex
|
// lock the context mutex
|
||||||
let ctxarc = Arc::clone(&ctx);
|
let ctxarc = Arc::clone(&ctx);
|
||||||
let mut ctx = ctxarc.lock().await;
|
let mut ctx = ctxarc.lock().await;
|
||||||
|
|
||||||
if ip.mode == "init" {
|
if received_ip.mode == "init" {
|
||||||
for i in &mut ctx.get_blocklist_toblock().await {
|
for ip_to_send in &mut ctx.get_blocklist_toblock().await {
|
||||||
i.mode = "zmq".to_string();
|
ip_to_send.mode = "zmq".to_string();
|
||||||
send_to_ipbl_zmq(&reqsocket, i).await;
|
send_to_ipbl_zmq(&reqsocket, ip_to_send).await;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// refresh context blocklist
|
// refresh context blocklist
|
||||||
ctx.update_blocklist(&ip).await;
|
ctx.update_blocklist(&received_ip).await;
|
||||||
ctx.gc_blocklist().await;
|
ctx.gc_blocklist().await;
|
||||||
|
|
||||||
// send ip list to ws and zmq sockets
|
// send ip list to ws and zmq sockets
|
||||||
if ip.hostname == ctx.hostname {
|
if received_ip.hostname == ctx.hostname && received_ip.mode != "zmq" {
|
||||||
send_to_ipbl_ws(&ctx, &ip, &mut ret).await;
|
send_to_ipbl_ws(&ctx, &received_ip, &mut ret).await;
|
||||||
send_to_ipbl_zmq(&reqsocket, &mut ip).await;
|
send_to_ipbl_zmq(&reqsocket, &mut received_ip).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// apply firewall blocking
|
// apply firewall blocking
|
||||||
@ -118,14 +118,9 @@ async fn watchfiles(ctx: &Arc<Mutex<Context>>) -> Receiver<FileEvent> {
|
|||||||
events = ctx.instance.read_events().unwrap();
|
events = ctx.instance.read_events().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
for event in events {
|
for inotifyevent in events {
|
||||||
let date: DateTime<Local> = Local::now().trunc_subsecs(0);
|
let date: DateTime<Local> = Local::now().trunc_subsecs(0);
|
||||||
bltx.send(FileEvent {
|
bltx.send(FileEvent { inotifyevent, date }).await.unwrap();
|
||||||
inotifyevent: event,
|
|
||||||
date: date,
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
use ipnet::Ipv4Net;
|
|
||||||
use nftnl::{nft_set, set::Set, Batch, FinalizedBatch, ProtoFamily, Table};
|
use nftnl::{nft_set, set::Set, Batch, FinalizedBatch, ProtoFamily, Table};
|
||||||
use std::{ffi::CString, io::*, net::Ipv4Addr};
|
use std::{ffi::CString, io::*, net::Ipv4Addr};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user