Compare commits

..

No commits in common. "master" and "1.6.2" have entirely different histories.

10 changed files with 482 additions and 723 deletions

827
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package] [package]
name = "ipblc" name = "ipblc"
version = "1.7.0" version = "1.6.0"
edition = "2021" edition = "2021"
authors = ["PaulBSD <paul@paulbsd.com>"] authors = ["PaulBSD <paul@paulbsd.com>"]
description = "ipblc is a tool that search and send attacking ip addresses to ipbl" description = "ipblc is a tool that search and send attacking ip addresses to ipbl"
@ -21,9 +21,8 @@ regex = "1.10"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sd-notify = { version = "0.4" } tokio = { version = "1.34", features = ["full", "sync"] }
tokio = { version = "1.35", features = ["full", "sync"] } tungstenite = { version = "0.20", features = ["handshake", "rustls-tls-native-roots"] }
tungstenite = { version = "0.21", features = ["handshake", "rustls-tls-native-roots"] }
## to optimize binary size (slow compile time) ## to optimize binary size (slow compile time)
#[profile.release] #[profile.release]

View File

@ -17,7 +17,7 @@ use std::path::Path;
pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]); pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]);
const MASTERSERVER: &str = "ipbl.paulbsd.com"; const MASTERSERVER: &str = "ipbl.paulbsd.com";
const WSSUBSCRIPTION: &str = "ipbl"; const WSSUBSCRIPTION: &str = "ipbl";
const CONFIG_RETRY_INTERVAL: u64 = 2; const CONFIG_RETRY: u64 = 2;
const WEB_CLIENT_TIMEOUT: i64 = 5; const WEB_CLIENT_TIMEOUT: i64 = 5;
#[derive(Debug)] #[derive(Debug)]
@ -103,12 +103,12 @@ impl Context {
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: Discovery = match req.json().await { let data: Discovery = match req.json().await {
Ok(o) => o, Ok(res) => res,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
Ok(data) Ok(data)
} }
@ -127,10 +127,10 @@ impl Context {
} }
break; break;
} }
Err(e) => { Err(err) => {
println!("error loading config: {e}, retrying in {CONFIG_RETRY_INTERVAL}s"); println!("error loading config: {err}, retrying in {CONFIG_RETRY}s");
last_in_err = true; last_in_err = true;
sleep_s(CONFIG_RETRY_INTERVAL).await; sleep_s(CONFIG_RETRY).await;
} }
}; };
} }
@ -169,41 +169,38 @@ impl Context {
} }
pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> { pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> {
match &ipevent.ipdata { match self.cfg.sets.get(&ipevent.ipdata.src) {
Some(ipdata) => match self.cfg.sets.get(&ipdata.src) { Some(set) => {
Some(set) => { let starttime = DateTime::parse_from_rfc3339(ipevent.ipdata.date.as_str())
let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str()) .unwrap()
.unwrap() .with_timezone(&chrono::Local);
.with_timezone(&chrono::Local); let blocktime = set.blocktime;
let blocktime = set.blocktime; if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname { let block = self
let block = .blocklist
self.blocklist .entry(ipevent.ipdata.ip.to_string())
.entry(ipdata.ip.to_string()) .or_insert(BlockIpData {
.or_insert(BlockIpData { ipdata: ipevent.ipdata.clone(),
ipdata: ipdata.clone(), tryfail: 0,
tryfail: 0, starttime,
starttime, blocktime,
blocktime, });
}); block.tryfail += 1;
block.tryfail += 1; block.blocktime = blocktime;
block.blocktime = blocktime; if block.tryfail >= set.tryfail {
if block.tryfail >= set.tryfail { return Some(ipevent.clone());
return Some(ipevent.clone());
}
} else {
self.blocklist
.entry(ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipdata.clone(),
tryfail: set.tryfail,
starttime,
blocktime,
});
} }
} else {
self.blocklist
.entry(ipevent.ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipevent.ipdata.clone(),
tryfail: set.tryfail,
starttime,
blocktime,
});
} }
None => {} }
},
None => {} None => {}
} }
None None
@ -365,12 +362,12 @@ impl Config {
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: GlobalConfigV2 = match req.json::<GlobalConfigV2>().await { let data: GlobalConfigV2 = match req.json::<GlobalConfigV2>().await {
Ok(o) => o, Ok(res) => res,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
for d in data.sets { for d in data.sets {
@ -400,13 +397,13 @@ impl Config {
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await { let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
Ok(o) => o, Ok(res) => res,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
Ok(data) Ok(data)
@ -417,8 +414,8 @@ impl Config {
for trustnet in &self.trustnets { for trustnet in &self.trustnets {
match trustnet.parse() { match trustnet.parse() {
Ok(net) => trustnets.push(net), Ok(net) => trustnets.push(net),
Err(e) => { Err(err) => {
println!("error parsing {trustnet}, error: {e}"); println!("error parsing {trustnet}, error: {err}");
} }
}; };
} }
@ -430,7 +427,13 @@ impl Config {
msgtype: String::from("bootstrap"), msgtype: String::from("bootstrap"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: gethostname(true), hostname: gethostname(true),
ipdata: None, ipdata: IpData {
t: 4,
ip: "".to_string(),
src: "".to_string(),
date: "".to_string(),
hostname: "".to_string(),
},
} }
} }
} }
@ -518,13 +521,13 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4, t: 4,
ip: "1.1.1.1".to_string(), ip: "1.1.1.1".to_string(),
hostname: "test1".to_string(), hostname: "test1".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "ssh".to_string(), src: "ssh".to_string(),
}), },
}) })
.await; .await;
} }
@ -534,13 +537,13 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4, t: 4,
ip: "1.1.1.2".to_string(), ip: "1.1.1.2".to_string(),
hostname: "test2".to_string(), hostname: "test2".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
} }
@ -549,13 +552,13 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4, t: 4,
ip: "1.1.1.3".to_string(), ip: "1.1.1.3".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
@ -563,13 +566,13 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4, t: 4,
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
@ -577,26 +580,26 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4, t: 4,
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 6, t: 6,
ip: "2a00:1450:4007:805::2003".to_string(), ip: "2a00:1450:4007:805::2003".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;

View File

@ -21,6 +21,7 @@ pub fn fwglobalinit<'a>() -> ((Batch, Table), (Batch, Table)) {
macro_rules! initrules { macro_rules! initrules {
($batch:expr, $table:expr, $chain:ident) => { ($batch:expr, $table:expr, $chain:ident) => {
let mut $chain = Chain::new(&CString::new(PKG_NAME).unwrap(), &$table);
$chain.set_hook(nftnl::Hook::In, 1); $chain.set_hook(nftnl::Hook::In, 1);
$chain.set_policy(nftnl::Policy::Accept); $chain.set_policy(nftnl::Policy::Accept);
@ -28,11 +29,13 @@ macro_rules! initrules {
$batch.add(&Rule::new(&$chain), nftnl::MsgType::Del); $batch.add(&Rule::new(&$chain), nftnl::MsgType::Del);
let mut rule = Rule::new(&$chain); let mut rule = Rule::new(&$chain);
rule.add_expr(&nft_expr!(ct state)); rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32)); rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32)); rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter)); rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict accept)); rule.add_expr(&nft_expr!(verdict accept));
$batch.add(&rule, nftnl::MsgType::Add); $batch.add(&rule, nftnl::MsgType::Add);
}; };
} }
@ -83,62 +86,37 @@ fn fwinit(t: FwTableType) -> (Batch, Table) {
} }
pub fn fwblock( pub fn fwblock(
ips_add_all: &Vec<IpData>, ips_add: &Vec<IpData>,
ret: &mut Vec<String>, ret: &mut Vec<String>,
fwlen: &mut usize, fwlen: &mut usize,
) -> std::result::Result<(), Error> { ) -> std::result::Result<(), Error> {
let ((mut batch4, table4), (mut batch6, table6)) = fwglobalinit(); let ((mut batch4, table4), (mut batch6, table6)) = fwglobalinit();
let mut chain4 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table4);
let mut chain6 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table6);
initrules!(batch4, table4, chain4); initrules!(batch4, table4, chain4);
initrules!(batch6, table6, chain6); initrules!(batch6, table6, chain6);
let mut factor = 1;
if ips_add_all.len() > 100 {
factor = (ips_add_all.len() / 10) as usize
}
let ips_add_tmp: Vec<IpData> = ips_add_all.clone().iter().map(|x| x.clone()).collect();
let mut ips_add_iter = ips_add_tmp.chunks(factor);
let mut ips_add: Vec<&[IpData]> = vec![];
while let Some(x) = ips_add_iter.next() {
ips_add.push(x);
}
// build and add rules // build and add rules
for ipdata_group in ips_add.clone() { for ipdata in ips_add.clone() {
for ipdata in ipdata_group { match ipdata.t {
match ipdata.t { 4 => {
4 => { createrules!(ipdata, chain4, batch4, Ipv4Addr, ipv4);
createrules!(ipdata, chain4, batch4, Ipv4Addr, ipv4);
}
6 => {
createrules!(ipdata, chain6, batch6, Ipv6Addr, ipv6);
}
_ => {}
} }
6 => {
createrules!(ipdata, chain6, batch6, Ipv6Addr, ipv6);
}
_ => {}
} }
} }
// validate and send batch // validate and send batch
for b in [batch4, batch6] { for b in [batch4, batch6] {
let bf = b.finalize(); let bf = b.finalize();
match send_and_process(&bf) { send_and_process(&bf).unwrap();
Ok(_) => {}
Err(e) => {
println!("error sending batch: {e}");
}
};
} }
if fwlen != &mut ips_add_all.len() { if fwlen != &mut ips_add.len() {
ret.push(format!( ret.push(format!("{length} ip in firewall", length = ips_add.len()));
"{length} ip in firewall",
length = ips_add_all.len()
));
} }
*fwlen = ips_add_all.len(); *fwlen = ips_add.len();
Ok(()) Ok(())
} }

View File

@ -22,7 +22,7 @@ pub struct IpEvent {
pub msgtype: String, pub msgtype: String,
pub mode: String, pub mode: String,
pub hostname: String, pub hostname: String,
pub ipdata: Option<IpData>, pub ipdata: IpData,
} }
#[macro_export] #[macro_export]
@ -35,14 +35,6 @@ macro_rules! ipevent {
ipdata: $ipdata, ipdata: $ipdata,
} }
}; };
($msgtype:expr,$mode:expr,$hostname:expr) => {
IpEvent {
msgtype: String::from($msgtype),
mode: String::from($mode),
hostname: $hostname,
ipdata: None,
}
};
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]

View File

@ -11,7 +11,6 @@ use chrono::prelude::*;
use chrono::prelude::{DateTime, Local}; use chrono::prelude::{DateTime, Local};
use chrono::Duration; use chrono::Duration;
use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent}; use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent};
use sd_notify::*;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
@ -22,13 +21,6 @@ const BL_CHAN_SIZE: usize = 32;
const WS_CHAN_SIZE: usize = 64; const WS_CHAN_SIZE: usize = 64;
const LOOP_MAX_WAIT: u64 = 5; const LOOP_MAX_WAIT: u64 = 5;
macro_rules! log_with_systemd {
($msg:expr) => {
println!("{}", $msg);
notify(false, &[NotifyState::Status(format!("{}", $msg).as_str())]).unwrap();
};
}
pub async fn run() { pub async fn run() {
let inotify = Inotify::init(InitFlags::empty()).unwrap(); let inotify = Inotify::init(InitFlags::empty()).unwrap();
let globalctx = Context::new(&inotify).await; let globalctx = Context::new(&inotify).await;
@ -39,7 +31,7 @@ pub async fn run() {
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0); let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
log_with_systemd!(format!("Launching {}, version {}", PKG_NAME, pkgversion)); println!("Launching {}, version {}", PKG_NAME, pkgversion);
fwglobalinit(); fwglobalinit();
let ctxapi = Arc::clone(&ctxarc); let ctxapi = Arc::clone(&ctxarc);
@ -68,8 +60,6 @@ pub async fn run() {
compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await; compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await;
}); });
notify(false, &[NotifyState::Ready]).unwrap();
loop { loop {
let mut ret: Vec<String> = Vec::new(); let mut ret: Vec<String> = Vec::new();
@ -86,9 +76,8 @@ pub async fn run() {
if received_ip.msgtype == "bootstrap".to_string() { if received_ip.msgtype == "bootstrap".to_string() {
for ip_to_send in toblock { for ip_to_send in toblock {
let ipe = ipevent!("init","ws",gethostname(true),Some(ip_to_send)); let ipe = ipevent!("init","ws",gethostname(true),ip_to_send);
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await; wssocketrr = websocketreqrep(&ctxwsrr).await;
break; break;
} }
@ -104,24 +93,18 @@ pub async fn run() {
// send ip list to api and ws sockets // send ip list to api and ws sockets
if let Some(ipevent) = filtered_ipevent { if let Some(ipevent) = filtered_ipevent {
if received_ip.msgtype != "init" { if received_ip.msgtype != "init" {
log_with_systemd!(format!("sending {} to api and ws", ipevent.ipdata.clone().unwrap().ip)); println!("sending {} to api and ws", ipevent.ipdata.ip);
let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata); let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata);
send_to_ipbl_api(&server.clone(), &ipe).await; send_to_ipbl_api(&server.clone(), &ipe).await;
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
wssocketrr.close(None).unwrap(); if !status {
wssocketrr = websocketreqrep(&ctxwsrr).await; wssocketrr = websocketreqrep(&ctxwsrr).await;
continue; continue;
} }
} }
} }
} }
_val = sleep_s(LOOP_MAX_WAIT) => { _val = sleep_s(LOOP_MAX_WAIT) => {}
let ipe = ipevent!("ping", "ws", gethostname(true));
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
}
}
}; };
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
@ -129,8 +112,7 @@ pub async fn run() {
// log lines // log lines
if ret.len() > 0 { if ret.len() > 0 {
let result = ret.join(", "); println!("{ret}", ret = ret.join(", "));
log_with_systemd!(format!("{result}"));
} }
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
@ -146,26 +128,8 @@ async fn handle_cfg_reload(
) { ) {
let now_cfg_reload = Local::now().trunc_subsecs(0); let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) { if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) {
let inotify; let inotify = inoarc.read().await;
loop { match ctxclone.write().await.load(&inotify).await {
inotify = match inoarc.try_read() {
Ok(o) => o,
Err(e) => {
println!("{e}");
sleep_s(1).await;
continue;
}
};
break;
}
let mut ctxtest = match ctxclone.try_write() {
Ok(o) => o,
Err(e) => {
println!("{e}");
return;
}
};
match ctxtest.load(&inotify).await {
Ok(_) => { Ok(_) => {
*last_cfg_reload = Local::now().trunc_subsecs(0); *last_cfg_reload = Local::now().trunc_subsecs(0);
} }
@ -189,8 +153,8 @@ async fn handle_fwblock(ctxclone: Arc<RwLock<Context>>, ret: &mut Vec<String>, f
// apply firewall blocking // apply firewall blocking
match fwblock(&toblock, ret, fwlen) { match fwblock(&toblock, ret, fwlen) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(err) => {
println!("err: {e}, unable to push firewall rules, use super user") println!("Err: {err}, unable to push firewall rules, use super user")
} }
}; };
} }
@ -241,7 +205,8 @@ async fn compare_files_changes(
match modfiles.inevent.name { match modfiles.inevent.name {
Some(name) => { Some(name) => {
let filename = name.to_str().unwrap(); let filename = name.to_str().unwrap();
for (sak, sa) in sas.clone().iter_mut() { for sak in sas.clone().keys() {
let sa = sas.get(sak).unwrap();
if modfiles.inevent.wd == sa.wd { if modfiles.inevent.wd == sa.wd {
let handle: String; let handle: String;
if sa.filename.as_str() == "" { if sa.filename.as_str() == "" {
@ -279,7 +244,7 @@ async fn compare_files_changes(
} }
} }
for ip in iplist { for ip in iplist {
let ipe = ipevent!("add", "file", gethostname(true), Some(ip)); let ipe = ipevent!("add", "file", gethostname(true), ip);
let ipetx = ipeventtx.read().await; let ipetx = ipeventtx.read().await;
ipetx.send(ipe).await.unwrap(); ipetx.send(ipe).await.unwrap();
} }

View File

@ -4,42 +4,41 @@ use serde_json;
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener; use tokio::net::TcpSocket;
use tokio::sync::RwLock; use tokio::sync::RwLock;
pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> { pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let ctxarc = ctxarc.clone(); let ctxarc = ctxarc.clone();
let addr: String = { ctxarc.read().await.cfg.api.parse().unwrap() }; let addr = { ctxarc.read().await.cfg.api.parse().unwrap() };
let listener = match TcpListener::bind(addr).await {
Ok(o) => o, let socket = TcpSocket::new_v4().unwrap();
Err(e) => { match socket.bind(addr) {
println!("error: {e}"); Ok(_) => {}
Err(_) => {
println!("can't bind monitoring socket, exiting...");
std::process::exit(1); std::process::exit(1);
} }
}; }
socket.set_reuseaddr(true).unwrap();
socket
.set_linger(Some(std::time::Duration::from_secs(0)))
.unwrap();
let listener = socket.listen(128).unwrap();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
match listener.accept().await { match listener.accept().await {
Ok((mut socket, _addr)) => { Ok((stream, _addr)) => {
let mut buf = vec![0; 1024]; stream.readable().await.unwrap();
let (reader, mut writer) = stream.into_split();
let mut buf: [u8; 16] = [0; 16];
match socket.readable().await { match reader.try_read(&mut buf) {
Ok(_) => { Ok(_) => {}
match socket.try_read(&mut buf) {
Ok(_) => {}
Err(e) => {
println!("error: {e}");
continue;
}
};
}
Err(e) => { Err(e) => {
println!("error: {e}"); println!("error: {}", e);
continue;
} }
} };
let msg = match String::from_utf8(buf.to_vec()) { let msg = match String::from_utf8(buf.to_vec()) {
Ok(o) => o.trim_matches(char::from(0)).trim().to_string(), Ok(o) => o.trim_matches(char::from(0)).trim().to_string(),
Err(_) => "".to_string(), Err(_) => "".to_string(),
@ -47,15 +46,15 @@ pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let res = format_result(&ctxarc, msg.as_str()).await; let res = format_result(&ctxarc, msg.as_str()).await;
match socket.write_all(res.as_bytes()).await { match writer.write_all(format!("{res}").as_bytes()).await {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(err) => {
println!("error: {e}"); println!("ee {err}");
} }
} }
} }
Err(err) => { Err(err) => {
println!("error: {err}"); println!("unable to serialize data: {err}");
} }
} }
} }

View File

@ -6,9 +6,9 @@ use tokio::time::{sleep, Duration};
pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> { pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
let mut file = match File::open(filename) { let mut file = match File::open(filename) {
Ok(o) => o, Ok(f) => f,
Err(e) => { Err(err) => {
println!("error: {e}"); println!("{err}");
return None; return None;
} }
}; };
@ -21,11 +21,6 @@ pub async fn sleep_s(s: u64) {
sleep(Duration::from_secs(s)).await; sleep(Duration::from_secs(s)).await;
} }
#[allow(dead_code)]
pub async fn sleep_ms(m: u64) {
sleep(Duration::from_millis(m)).await;
}
pub fn gethostname(show_fqdn: bool) -> String { pub fn gethostname(show_fqdn: bool) -> String {
let hostname_cstr = unistd::gethostname().expect("Failed getting hostname"); let hostname_cstr = unistd::gethostname().expect("Failed getting hostname");
let fqdn = hostname_cstr let fqdn = hostname_cstr

View File

@ -11,12 +11,12 @@ pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
let mut try_req = 0; let mut try_req = 0;
let client = httpclient(); let client = httpclient();
loop { loop {
match push_ip(&client, &server, &ip.ipdata.clone().unwrap()).await { match push_ip(&client, &server, &ip.ipdata).await {
Ok(_) => { Ok(_) => {
break; break;
} }
Err(e) => { Err(err) => {
println!("error: {e}"); println!("{err}");
sleep_s(1).await; sleep_s(1).await;
if try_req == MAX_FAILED_API_RATE { if try_req == MAX_FAILED_API_RATE {
break; break;

View File

@ -14,13 +14,13 @@ use tungstenite::*;
pub async fn websocketreqrep( pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>, ctxarc: &Arc<RwLock<Context>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> { ) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event, wscfg); let (mut wssocketrr, bootstrap_event, cfg);
{ {
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
bootstrap_event = ctx.cfg.bootstrap_event().clone(); bootstrap_event = ctx.cfg.bootstrap_event().clone();
wscfg = ctx.cfg.ws.get("reqrep").unwrap().clone(); cfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
} }
wssocketrr = websocketconnect(&wscfg, &gethostname(true)).await.unwrap(); wssocketrr = websocketconnect(&cfg, &gethostname(true)).await.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await; send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
return wssocketrr; return wssocketrr;
@ -45,24 +45,15 @@ pub async fn websocketpubsub(
Ok(msg) => { Ok(msg) => {
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) { let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
Ok(o) => o, Ok(o) => o,
Err(e) => { Err(_e) => {
println!("error in pubsub: {e:?}");
continue; continue;
} }
}; };
match tosend.ipdata.clone() { if tosend.ipdata.hostname != gethostname(true)
Some(o) => { || tosend.msgtype == "init".to_string()
if o.hostname != gethostname(true) {
|| tosend.msgtype == "init".to_string() let txps = txpubsub.read().await;
{ txps.send(tosend).await.unwrap();
let txps = txpubsub.read().await;
txps.send(tosend).await.unwrap();
}
}
None => {
let txps = txpubsub.read().await;
txps.send(tosend.clone()).await.unwrap();
}
} }
} }
Err(e) => { Err(e) => {
@ -114,12 +105,11 @@ pub async fn send_to_ipbl_websocket(
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err send read: {e:?}"); println!("err send read: {e:?}");
return false; return handle_websocket_error(ws);
} }
}; };
} else { } else {
println!("can't write to socket"); return handle_websocket_error(ws);
return false;
}; };
if ws.can_read() { if ws.can_read() {
@ -127,13 +117,16 @@ pub async fn send_to_ipbl_websocket(
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err send read: {e:?}"); println!("err send read: {e:?}");
return false; return handle_websocket_error(ws);
} }
}; };
} else { } else {
println!("can't read from socket"); return handle_websocket_error(ws);
return false;
}; };
true true
} }
fn handle_websocket_error(ws: &mut WebSocket<MaybeTlsStream<TcpStream>>) -> bool {
ws.close(None).unwrap();
return false;
}