Compare commits

..

33 Commits

Author SHA1 Message Date
1998e6e77a feat: bump 1.7.0
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2024-10-06 10:28:15 +02:00
f0cb50e797 Merge pull request 'add systemd notify support' (#14) from develop into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #14
2024-10-02 19:32:47 +02:00
3c4d6fb2cf update errors and variable names
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-10-02 19:11:44 +02:00
bdf41fa605 add systemd notify statuses
Some checks failed
continuous-integration/drone/push Build is failing
2024-10-02 19:07:27 +02:00
ebb6e5ec6d added sd-notify dependency
All checks were successful
continuous-integration/drone/push Build is passing
2024-10-02 17:45:26 +02:00
46eaf6017f updated dependencies
All checks were successful
continuous-integration/drone/push Build is passing
2024-05-28 14:50:07 +02:00
9b456d403f updated dependencies
All checks were successful
continuous-integration/drone/push Build is passing
2024-05-24 12:44:32 +02:00
71d640f393 updated dependencies
All checks were successful
continuous-integration/drone/push Build is passing
2024-05-24 12:43:24 +02:00
0a82d46bf1 updated dependencies
All checks were successful
continuous-integration/drone/push Build is passing
2024-05-11 18:01:32 +02:00
29472b4d7f updated dependencies
All checks were successful
continuous-integration/drone/push Build is passing
2024-05-11 17:59:36 +02:00
22214b8d55 Merge pull request 'updated dependencies - fosdem 2024 commit' (#13) from develop into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone Build is passing
Reviewed-on: #13
2024-02-03 10:21:22 +01:00
9bae2248df updated dependencies - fosdem 2024 commit
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2024-02-03 10:13:00 +01:00
ecd35fd37a update to 1.6.8
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2024-01-04 11:05:47 +01:00
129a7e9ada updated dependencies
All checks were successful
continuous-integration/drone/push Build is passing
2024-01-03 21:47:13 +01:00
1e2f047824 add ips in chunks to nftables
All checks were successful
continuous-integration/drone/push Build is passing
2024-01-03 21:44:00 +01:00
a60ec90608 update to 1.6.7
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2023-12-26 13:14:08 +01:00
ce6ca78087 added safety in ipblc
All checks were successful
continuous-integration/drone/push Build is passing
2023-12-26 13:13:30 +01:00
2e6e7efdbf hotfix on ws connections
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2023-12-26 11:11:38 +01:00
bae5443ca4 update to version 1.6.6
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/tag Build is passing
2023-12-26 10:49:21 +01:00
f29ccd3f0b updated ipevent with Option<IpData>
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2023-12-26 10:42:39 +01:00
6c43635c92 update to version 1.6.6
All checks were successful
continuous-integration/drone/push Build is passing
2023-12-24 07:44:21 +01:00
1067566e9d updated dependencies
All checks were successful
continuous-integration/drone/push Build is passing
2023-12-23 13:19:14 +01:00
d47a4e218d update to version 1.6.5
All checks were successful
continuous-integration/drone/push Build is passing
2023-12-23 13:18:12 +01:00
0b67bbdab3 update to version 1.6.5
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2023-12-23 13:13:52 +01:00
809b252df7 added error handling for monitoring
All checks were successful
continuous-integration/drone/push Build is passing
2023-12-12 22:41:21 +01:00
5d132c6380 Merge branch 'develop'
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
2023-12-04 12:31:08 +01:00
80c3faec58 fix exception handling on fw.rs
All checks were successful
continuous-integration/drone/push Build is passing
2023-12-04 12:18:59 +01:00
103f8ea411 Merge pull request 'update to version 1.6.4' (#11) from develop into master
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
Reviewed-on: #11
2023-11-27 13:49:42 +01:00
104d1558b1 update to version 1.6.4
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2023-11-27 13:44:44 +01:00
ad8744a92c Merge pull request 'fix of websocket error' (#10) from develop into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #10
2023-11-25 18:01:12 +01:00
1313296acf updated dependencies
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is passing
2023-11-25 17:46:24 +01:00
46a01efeea fix return in websocket.rs/send_to_ipbl_websocket
Some checks failed
continuous-integration/drone/push Build is failing
2023-11-25 17:35:48 +01:00
c681825efe fix error handling in websocket
All checks were successful
continuous-integration/drone/push Build is passing
2023-11-24 21:16:07 +01:00
10 changed files with 702 additions and 454 deletions

815
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package] [package]
name = "ipblc" name = "ipblc"
version = "1.6.3" version = "1.7.0"
edition = "2021" edition = "2021"
authors = ["PaulBSD <paul@paulbsd.com>"] authors = ["PaulBSD <paul@paulbsd.com>"]
description = "ipblc is a tool that search and send attacking ip addresses to ipbl" description = "ipblc is a tool that search and send attacking ip addresses to ipbl"
@ -21,8 +21,9 @@ regex = "1.10"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
tokio = { version = "1.34", features = ["full", "sync"] } sd-notify = { version = "0.4" }
tungstenite = { version = "0.20", features = ["handshake", "rustls-tls-native-roots"] } tokio = { version = "1.35", features = ["full", "sync"] }
tungstenite = { version = "0.21", features = ["handshake", "rustls-tls-native-roots"] }
## to optimize binary size (slow compile time) ## to optimize binary size (slow compile time)
#[profile.release] #[profile.release]

View File

@ -17,7 +17,7 @@ use std::path::Path;
pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]); pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]);
const MASTERSERVER: &str = "ipbl.paulbsd.com"; const MASTERSERVER: &str = "ipbl.paulbsd.com";
const WSSUBSCRIPTION: &str = "ipbl"; const WSSUBSCRIPTION: &str = "ipbl";
const CONFIG_RETRY: u64 = 2; const CONFIG_RETRY_INTERVAL: u64 = 2;
const WEB_CLIENT_TIMEOUT: i64 = 5; const WEB_CLIENT_TIMEOUT: i64 = 5;
#[derive(Debug)] #[derive(Debug)]
@ -103,12 +103,12 @@ impl Context {
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(re) => re, Ok(o) => o,
Err(err) => return Err(err), Err(e) => return Err(e),
}; };
let data: Discovery = match req.json().await { let data: Discovery = match req.json().await {
Ok(res) => res, Ok(o) => o,
Err(err) => return Err(err), Err(e) => return Err(e),
}; };
Ok(data) Ok(data)
} }
@ -127,10 +127,10 @@ impl Context {
} }
break; break;
} }
Err(err) => { Err(e) => {
println!("error loading config: {err}, retrying in {CONFIG_RETRY}s"); println!("error loading config: {e}, retrying in {CONFIG_RETRY_INTERVAL}s");
last_in_err = true; last_in_err = true;
sleep_s(CONFIG_RETRY).await; sleep_s(CONFIG_RETRY_INTERVAL).await;
} }
}; };
} }
@ -169,18 +169,19 @@ impl Context {
} }
pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> { pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> {
match self.cfg.sets.get(&ipevent.ipdata.src) { match &ipevent.ipdata {
Some(ipdata) => match self.cfg.sets.get(&ipdata.src) {
Some(set) => { Some(set) => {
let starttime = DateTime::parse_from_rfc3339(ipevent.ipdata.date.as_str()) let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str())
.unwrap() .unwrap()
.with_timezone(&chrono::Local); .with_timezone(&chrono::Local);
let blocktime = set.blocktime; let blocktime = set.blocktime;
if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname { if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
let block = self let block =
.blocklist self.blocklist
.entry(ipevent.ipdata.ip.to_string()) .entry(ipdata.ip.to_string())
.or_insert(BlockIpData { .or_insert(BlockIpData {
ipdata: ipevent.ipdata.clone(), ipdata: ipdata.clone(),
tryfail: 0, tryfail: 0,
starttime, starttime,
blocktime, blocktime,
@ -192,9 +193,9 @@ impl Context {
} }
} else { } else {
self.blocklist self.blocklist
.entry(ipevent.ipdata.ip.to_string()) .entry(ipdata.ip.to_string())
.or_insert(BlockIpData { .or_insert(BlockIpData {
ipdata: ipevent.ipdata.clone(), ipdata: ipdata.clone(),
tryfail: set.tryfail, tryfail: set.tryfail,
starttime, starttime,
blocktime, blocktime,
@ -202,6 +203,8 @@ impl Context {
} }
} }
None => {} None => {}
},
None => {}
} }
None None
} }
@ -362,12 +365,12 @@ impl Config {
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(re) => re, Ok(o) => o,
Err(err) => return Err(err), Err(e) => return Err(e),
}; };
let data: GlobalConfigV2 = match req.json::<GlobalConfigV2>().await { let data: GlobalConfigV2 = match req.json::<GlobalConfigV2>().await {
Ok(res) => res, Ok(o) => o,
Err(err) => return Err(err), Err(e) => return Err(e),
}; };
for d in data.sets { for d in data.sets {
@ -397,13 +400,13 @@ impl Config {
.await; .await;
let req = match resp { let req = match resp {
Ok(re) => re, Ok(o) => o,
Err(err) => return Err(err), Err(e) => return Err(e),
}; };
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await { let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
Ok(res) => res, Ok(o) => o,
Err(err) => return Err(err), Err(e) => return Err(e),
}; };
Ok(data) Ok(data)
@ -414,8 +417,8 @@ impl Config {
for trustnet in &self.trustnets { for trustnet in &self.trustnets {
match trustnet.parse() { match trustnet.parse() {
Ok(net) => trustnets.push(net), Ok(net) => trustnets.push(net),
Err(err) => { Err(e) => {
println!("error parsing {trustnet}, error: {err}"); println!("error parsing {trustnet}, error: {e}");
} }
}; };
} }
@ -427,13 +430,7 @@ impl Config {
msgtype: String::from("bootstrap"), msgtype: String::from("bootstrap"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: gethostname(true), hostname: gethostname(true),
ipdata: IpData { ipdata: None,
t: 4,
ip: "".to_string(),
src: "".to_string(),
date: "".to_string(),
hostname: "".to_string(),
},
} }
} }
} }
@ -521,13 +518,13 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: IpData { ipdata: Some(IpData {
t: 4, t: 4,
ip: "1.1.1.1".to_string(), ip: "1.1.1.1".to_string(),
hostname: "test1".to_string(), hostname: "test1".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "ssh".to_string(), src: "ssh".to_string(),
}, }),
}) })
.await; .await;
} }
@ -537,13 +534,13 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: IpData { ipdata: Some(IpData {
t: 4, t: 4,
ip: "1.1.1.2".to_string(), ip: "1.1.1.2".to_string(),
hostname: "test2".to_string(), hostname: "test2".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}, }),
}) })
.await; .await;
} }
@ -552,13 +549,13 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: IpData { ipdata: Some(IpData {
t: 4, t: 4,
ip: "1.1.1.3".to_string(), ip: "1.1.1.3".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}, }),
}) })
.await; .await;
@ -566,13 +563,13 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: IpData { ipdata: Some(IpData {
t: 4, t: 4,
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}, }),
}) })
.await; .await;
@ -580,26 +577,26 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: IpData { ipdata: Some(IpData {
t: 4, t: 4,
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}, }),
}) })
.await; .await;
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: IpData { ipdata: Some(IpData {
t: 6, t: 6,
ip: "2a00:1450:4007:805::2003".to_string(), ip: "2a00:1450:4007:805::2003".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}, }),
}) })
.await; .await;

View File

@ -21,7 +21,6 @@ pub fn fwglobalinit<'a>() -> ((Batch, Table), (Batch, Table)) {
macro_rules! initrules { macro_rules! initrules {
($batch:expr, $table:expr, $chain:ident) => { ($batch:expr, $table:expr, $chain:ident) => {
let mut $chain = Chain::new(&CString::new(PKG_NAME).unwrap(), &$table);
$chain.set_hook(nftnl::Hook::In, 1); $chain.set_hook(nftnl::Hook::In, 1);
$chain.set_policy(nftnl::Policy::Accept); $chain.set_policy(nftnl::Policy::Accept);
@ -29,13 +28,11 @@ macro_rules! initrules {
$batch.add(&Rule::new(&$chain), nftnl::MsgType::Del); $batch.add(&Rule::new(&$chain), nftnl::MsgType::Del);
let mut rule = Rule::new(&$chain); let mut rule = Rule::new(&$chain);
rule.add_expr(&nft_expr!(ct state)); rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32)); rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32)); rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter)); rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict accept)); rule.add_expr(&nft_expr!(verdict accept));
$batch.add(&rule, nftnl::MsgType::Add); $batch.add(&rule, nftnl::MsgType::Add);
}; };
} }
@ -86,17 +83,33 @@ fn fwinit(t: FwTableType) -> (Batch, Table) {
} }
pub fn fwblock( pub fn fwblock(
ips_add: &Vec<IpData>, ips_add_all: &Vec<IpData>,
ret: &mut Vec<String>, ret: &mut Vec<String>,
fwlen: &mut usize, fwlen: &mut usize,
) -> std::result::Result<(), Error> { ) -> std::result::Result<(), Error> {
let ((mut batch4, table4), (mut batch6, table6)) = fwglobalinit(); let ((mut batch4, table4), (mut batch6, table6)) = fwglobalinit();
let mut chain4 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table4);
let mut chain6 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table6);
initrules!(batch4, table4, chain4); initrules!(batch4, table4, chain4);
initrules!(batch6, table6, chain6); initrules!(batch6, table6, chain6);
let mut factor = 1;
if ips_add_all.len() > 100 {
factor = (ips_add_all.len() / 10) as usize
}
let ips_add_tmp: Vec<IpData> = ips_add_all.clone().iter().map(|x| x.clone()).collect();
let mut ips_add_iter = ips_add_tmp.chunks(factor);
let mut ips_add: Vec<&[IpData]> = vec![];
while let Some(x) = ips_add_iter.next() {
ips_add.push(x);
}
// build and add rules // build and add rules
for ipdata in ips_add.clone() { for ipdata_group in ips_add.clone() {
for ipdata in ipdata_group {
match ipdata.t { match ipdata.t {
4 => { 4 => {
createrules!(ipdata, chain4, batch4, Ipv4Addr, ipv4); createrules!(ipdata, chain4, batch4, Ipv4Addr, ipv4);
@ -107,16 +120,25 @@ pub fn fwblock(
_ => {} _ => {}
} }
} }
}
// validate and send batch // validate and send batch
for b in [batch4, batch6] { for b in [batch4, batch6] {
let bf = b.finalize(); let bf = b.finalize();
send_and_process(&bf).unwrap(); match send_and_process(&bf) {
Ok(_) => {}
Err(e) => {
println!("error sending batch: {e}");
} }
if fwlen != &mut ips_add.len() { };
ret.push(format!("{length} ip in firewall", length = ips_add.len()));
} }
*fwlen = ips_add.len(); if fwlen != &mut ips_add_all.len() {
ret.push(format!(
"{length} ip in firewall",
length = ips_add_all.len()
));
}
*fwlen = ips_add_all.len();
Ok(()) Ok(())
} }

View File

@ -22,7 +22,7 @@ pub struct IpEvent {
pub msgtype: String, pub msgtype: String,
pub mode: String, pub mode: String,
pub hostname: String, pub hostname: String,
pub ipdata: IpData, pub ipdata: Option<IpData>,
} }
#[macro_export] #[macro_export]
@ -35,6 +35,14 @@ macro_rules! ipevent {
ipdata: $ipdata, ipdata: $ipdata,
} }
}; };
($msgtype:expr,$mode:expr,$hostname:expr) => {
IpEvent {
msgtype: String::from($msgtype),
mode: String::from($mode),
hostname: $hostname,
ipdata: None,
}
};
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]

View File

@ -11,6 +11,7 @@ use chrono::prelude::*;
use chrono::prelude::{DateTime, Local}; use chrono::prelude::{DateTime, Local};
use chrono::Duration; use chrono::Duration;
use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent}; use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent};
use sd_notify::*;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
@ -21,6 +22,13 @@ const BL_CHAN_SIZE: usize = 32;
const WS_CHAN_SIZE: usize = 64; const WS_CHAN_SIZE: usize = 64;
const LOOP_MAX_WAIT: u64 = 5; const LOOP_MAX_WAIT: u64 = 5;
macro_rules! log_with_systemd {
($msg:expr) => {
println!("{}", $msg);
notify(false, &[NotifyState::Status(format!("{}", $msg).as_str())]).unwrap();
};
}
pub async fn run() { pub async fn run() {
let inotify = Inotify::init(InitFlags::empty()).unwrap(); let inotify = Inotify::init(InitFlags::empty()).unwrap();
let globalctx = Context::new(&inotify).await; let globalctx = Context::new(&inotify).await;
@ -31,7 +39,7 @@ pub async fn run() {
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0); let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
println!("Launching {}, version {}", PKG_NAME, pkgversion); log_with_systemd!(format!("Launching {}, version {}", PKG_NAME, pkgversion));
fwglobalinit(); fwglobalinit();
let ctxapi = Arc::clone(&ctxarc); let ctxapi = Arc::clone(&ctxarc);
@ -60,6 +68,8 @@ pub async fn run() {
compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await; compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await;
}); });
notify(false, &[NotifyState::Ready]).unwrap();
loop { loop {
let mut ret: Vec<String> = Vec::new(); let mut ret: Vec<String> = Vec::new();
@ -76,8 +86,9 @@ pub async fn run() {
if received_ip.msgtype == "bootstrap".to_string() { if received_ip.msgtype == "bootstrap".to_string() {
for ip_to_send in toblock { for ip_to_send in toblock {
let ipe = ipevent!("init","ws",gethostname(true),ip_to_send); let ipe = ipevent!("init","ws",gethostname(true),Some(ip_to_send));
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await; wssocketrr = websocketreqrep(&ctxwsrr).await;
break; break;
} }
@ -93,18 +104,24 @@ pub async fn run() {
// send ip list to api and ws sockets // send ip list to api and ws sockets
if let Some(ipevent) = filtered_ipevent { if let Some(ipevent) = filtered_ipevent {
if received_ip.msgtype != "init" { if received_ip.msgtype != "init" {
println!("sending {} to api and ws", ipevent.ipdata.ip); log_with_systemd!(format!("sending {} to api and ws", ipevent.ipdata.clone().unwrap().ip));
let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata); let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata);
send_to_ipbl_api(&server.clone(), &ipe).await; send_to_ipbl_api(&server.clone(), &ipe).await;
let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await; if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
if !status { wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await; wssocketrr = websocketreqrep(&ctxwsrr).await;
continue; continue;
} }
} }
} }
} }
_val = sleep_s(LOOP_MAX_WAIT) => {} _val = sleep_s(LOOP_MAX_WAIT) => {
let ipe = ipevent!("ping", "ws", gethostname(true));
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
}
}
}; };
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
@ -112,7 +129,8 @@ pub async fn run() {
// log lines // log lines
if ret.len() > 0 { if ret.len() > 0 {
println!("{ret}", ret = ret.join(", ")); let result = ret.join(", ");
log_with_systemd!(format!("{result}"));
} }
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
@ -128,8 +146,26 @@ async fn handle_cfg_reload(
) { ) {
let now_cfg_reload = Local::now().trunc_subsecs(0); let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) { if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) {
let inotify = inoarc.read().await; let inotify;
match ctxclone.write().await.load(&inotify).await { loop {
inotify = match inoarc.try_read() {
Ok(o) => o,
Err(e) => {
println!("{e}");
sleep_s(1).await;
continue;
}
};
break;
}
let mut ctxtest = match ctxclone.try_write() {
Ok(o) => o,
Err(e) => {
println!("{e}");
return;
}
};
match ctxtest.load(&inotify).await {
Ok(_) => { Ok(_) => {
*last_cfg_reload = Local::now().trunc_subsecs(0); *last_cfg_reload = Local::now().trunc_subsecs(0);
} }
@ -153,8 +189,8 @@ async fn handle_fwblock(ctxclone: Arc<RwLock<Context>>, ret: &mut Vec<String>, f
// apply firewall blocking // apply firewall blocking
match fwblock(&toblock, ret, fwlen) { match fwblock(&toblock, ret, fwlen) {
Ok(_) => {} Ok(_) => {}
Err(err) => { Err(e) => {
println!("Err: {err}, unable to push firewall rules, use super user") println!("err: {e}, unable to push firewall rules, use super user")
} }
}; };
} }
@ -205,8 +241,7 @@ async fn compare_files_changes(
match modfiles.inevent.name { match modfiles.inevent.name {
Some(name) => { Some(name) => {
let filename = name.to_str().unwrap(); let filename = name.to_str().unwrap();
for sak in sas.clone().keys() { for (sak, sa) in sas.clone().iter_mut() {
let sa = sas.get(sak).unwrap();
if modfiles.inevent.wd == sa.wd { if modfiles.inevent.wd == sa.wd {
let handle: String; let handle: String;
if sa.filename.as_str() == "" { if sa.filename.as_str() == "" {
@ -244,7 +279,7 @@ async fn compare_files_changes(
} }
} }
for ip in iplist { for ip in iplist {
let ipe = ipevent!("add", "file", gethostname(true), ip); let ipe = ipevent!("add", "file", gethostname(true), Some(ip));
let ipetx = ipeventtx.read().await; let ipetx = ipeventtx.read().await;
ipetx.send(ipe).await.unwrap(); ipetx.send(ipe).await.unwrap();
} }

View File

@ -10,7 +10,13 @@ use tokio::sync::RwLock;
pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> { pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let ctxarc = ctxarc.clone(); let ctxarc = ctxarc.clone();
let addr: String = { ctxarc.read().await.cfg.api.parse().unwrap() }; let addr: String = { ctxarc.read().await.cfg.api.parse().unwrap() };
let listener = TcpListener::bind(addr).await.unwrap(); let listener = match TcpListener::bind(addr).await {
Ok(o) => o,
Err(e) => {
println!("error: {e}");
std::process::exit(1);
}
};
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
@ -23,13 +29,13 @@ pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
match socket.try_read(&mut buf) { match socket.try_read(&mut buf) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("{e}"); println!("error: {e}");
continue; continue;
} }
}; };
} }
Err(e) => { Err(e) => {
println!("{e}"); println!("error: {e}");
continue; continue;
} }
} }
@ -43,8 +49,8 @@ pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
match socket.write_all(res.as_bytes()).await { match socket.write_all(res.as_bytes()).await {
Ok(_) => {} Ok(_) => {}
Err(err) => { Err(e) => {
println!("ee {err}"); println!("error: {e}");
} }
} }
} }

View File

@ -6,9 +6,9 @@ use tokio::time::{sleep, Duration};
pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> { pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
let mut file = match File::open(filename) { let mut file = match File::open(filename) {
Ok(f) => f, Ok(o) => o,
Err(err) => { Err(e) => {
println!("{err}"); println!("error: {e}");
return None; return None;
} }
}; };
@ -21,6 +21,11 @@ pub async fn sleep_s(s: u64) {
sleep(Duration::from_secs(s)).await; sleep(Duration::from_secs(s)).await;
} }
#[allow(dead_code)]
pub async fn sleep_ms(m: u64) {
sleep(Duration::from_millis(m)).await;
}
pub fn gethostname(show_fqdn: bool) -> String { pub fn gethostname(show_fqdn: bool) -> String {
let hostname_cstr = unistd::gethostname().expect("Failed getting hostname"); let hostname_cstr = unistd::gethostname().expect("Failed getting hostname");
let fqdn = hostname_cstr let fqdn = hostname_cstr

View File

@ -11,12 +11,12 @@ pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
let mut try_req = 0; let mut try_req = 0;
let client = httpclient(); let client = httpclient();
loop { loop {
match push_ip(&client, &server, &ip.ipdata).await { match push_ip(&client, &server, &ip.ipdata.clone().unwrap()).await {
Ok(_) => { Ok(_) => {
break; break;
} }
Err(err) => { Err(e) => {
println!("{err}"); println!("error: {e}");
sleep_s(1).await; sleep_s(1).await;
if try_req == MAX_FAILED_API_RATE { if try_req == MAX_FAILED_API_RATE {
break; break;

View File

@ -14,13 +14,13 @@ use tungstenite::*;
pub async fn websocketreqrep( pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>, ctxarc: &Arc<RwLock<Context>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> { ) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event, cfg); let (mut wssocketrr, bootstrap_event, wscfg);
{ {
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
bootstrap_event = ctx.cfg.bootstrap_event().clone(); bootstrap_event = ctx.cfg.bootstrap_event().clone();
cfg = ctx.cfg.ws.get("reqrep").unwrap().clone(); wscfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
} }
wssocketrr = websocketconnect(&cfg, &gethostname(true)).await.unwrap(); wssocketrr = websocketconnect(&wscfg, &gethostname(true)).await.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await; send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
return wssocketrr; return wssocketrr;
@ -45,17 +45,26 @@ pub async fn websocketpubsub(
Ok(msg) => { Ok(msg) => {
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) { let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
Ok(o) => o, Ok(o) => o,
Err(_e) => { Err(e) => {
println!("error in pubsub: {e:?}");
continue; continue;
} }
}; };
if tosend.ipdata.hostname != gethostname(true) match tosend.ipdata.clone() {
Some(o) => {
if o.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string() || tosend.msgtype == "init".to_string()
{ {
let txps = txpubsub.read().await; let txps = txpubsub.read().await;
txps.send(tosend).await.unwrap(); txps.send(tosend).await.unwrap();
} }
} }
None => {
let txps = txpubsub.read().await;
txps.send(tosend.clone()).await.unwrap();
}
}
}
Err(e) => { Err(e) => {
println!("error in pubsub: {e:?}"); println!("error in pubsub: {e:?}");
ws.close(None).unwrap(); ws.close(None).unwrap();
@ -105,11 +114,12 @@ pub async fn send_to_ipbl_websocket(
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err send read: {e:?}"); println!("err send read: {e:?}");
return handle_websocket_error(ws); return false;
} }
}; };
} else { } else {
return handle_websocket_error(ws); println!("can't write to socket");
return false;
}; };
if ws.can_read() { if ws.can_read() {
@ -117,16 +127,13 @@ pub async fn send_to_ipbl_websocket(
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err send read: {e:?}"); println!("err send read: {e:?}");
return handle_websocket_error(ws); return false;
} }
}; };
} else { } else {
return handle_websocket_error(ws); println!("can't read from socket");
return false;
}; };
true true
} }
fn handle_websocket_error(ws: &mut WebSocket<MaybeTlsStream<TcpStream>>) -> bool {
ws.close(None).unwrap();
return false;
}