Compare commits
No commits in common. "master" and "1.6.2" have entirely different histories.
827
Cargo.lock
generated
827
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "ipblc"
|
||||
version = "1.7.0"
|
||||
version = "1.6.0"
|
||||
edition = "2021"
|
||||
authors = ["PaulBSD <paul@paulbsd.com>"]
|
||||
description = "ipblc is a tool that search and send attacking ip addresses to ipbl"
|
||||
@ -21,9 +21,8 @@ regex = "1.10"
|
||||
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
sd-notify = { version = "0.4" }
|
||||
tokio = { version = "1.35", features = ["full", "sync"] }
|
||||
tungstenite = { version = "0.21", features = ["handshake", "rustls-tls-native-roots"] }
|
||||
tokio = { version = "1.34", features = ["full", "sync"] }
|
||||
tungstenite = { version = "0.20", features = ["handshake", "rustls-tls-native-roots"] }
|
||||
|
||||
## to optimize binary size (slow compile time)
|
||||
#[profile.release]
|
||||
|
131
src/config.rs
131
src/config.rs
@ -17,7 +17,7 @@ use std::path::Path;
|
||||
pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]);
|
||||
const MASTERSERVER: &str = "ipbl.paulbsd.com";
|
||||
const WSSUBSCRIPTION: &str = "ipbl";
|
||||
const CONFIG_RETRY_INTERVAL: u64 = 2;
|
||||
const CONFIG_RETRY: u64 = 2;
|
||||
const WEB_CLIENT_TIMEOUT: i64 = 5;
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -103,12 +103,12 @@ impl Context {
|
||||
.send()
|
||||
.await;
|
||||
let req = match resp {
|
||||
Ok(o) => o,
|
||||
Err(e) => return Err(e),
|
||||
Ok(re) => re,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
let data: Discovery = match req.json().await {
|
||||
Ok(o) => o,
|
||||
Err(e) => return Err(e),
|
||||
Ok(res) => res,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
Ok(data)
|
||||
}
|
||||
@ -127,10 +127,10 @@ impl Context {
|
||||
}
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
println!("error loading config: {e}, retrying in {CONFIG_RETRY_INTERVAL}s");
|
||||
Err(err) => {
|
||||
println!("error loading config: {err}, retrying in {CONFIG_RETRY}s");
|
||||
last_in_err = true;
|
||||
sleep_s(CONFIG_RETRY_INTERVAL).await;
|
||||
sleep_s(CONFIG_RETRY).await;
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -169,41 +169,38 @@ impl Context {
|
||||
}
|
||||
|
||||
pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> {
|
||||
match &ipevent.ipdata {
|
||||
Some(ipdata) => match self.cfg.sets.get(&ipdata.src) {
|
||||
Some(set) => {
|
||||
let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str())
|
||||
.unwrap()
|
||||
.with_timezone(&chrono::Local);
|
||||
let blocktime = set.blocktime;
|
||||
if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
|
||||
let block =
|
||||
self.blocklist
|
||||
.entry(ipdata.ip.to_string())
|
||||
.or_insert(BlockIpData {
|
||||
ipdata: ipdata.clone(),
|
||||
tryfail: 0,
|
||||
starttime,
|
||||
blocktime,
|
||||
});
|
||||
block.tryfail += 1;
|
||||
block.blocktime = blocktime;
|
||||
if block.tryfail >= set.tryfail {
|
||||
return Some(ipevent.clone());
|
||||
}
|
||||
} else {
|
||||
self.blocklist
|
||||
.entry(ipdata.ip.to_string())
|
||||
.or_insert(BlockIpData {
|
||||
ipdata: ipdata.clone(),
|
||||
tryfail: set.tryfail,
|
||||
starttime,
|
||||
blocktime,
|
||||
});
|
||||
match self.cfg.sets.get(&ipevent.ipdata.src) {
|
||||
Some(set) => {
|
||||
let starttime = DateTime::parse_from_rfc3339(ipevent.ipdata.date.as_str())
|
||||
.unwrap()
|
||||
.with_timezone(&chrono::Local);
|
||||
let blocktime = set.blocktime;
|
||||
if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
|
||||
let block = self
|
||||
.blocklist
|
||||
.entry(ipevent.ipdata.ip.to_string())
|
||||
.or_insert(BlockIpData {
|
||||
ipdata: ipevent.ipdata.clone(),
|
||||
tryfail: 0,
|
||||
starttime,
|
||||
blocktime,
|
||||
});
|
||||
block.tryfail += 1;
|
||||
block.blocktime = blocktime;
|
||||
if block.tryfail >= set.tryfail {
|
||||
return Some(ipevent.clone());
|
||||
}
|
||||
} else {
|
||||
self.blocklist
|
||||
.entry(ipevent.ipdata.ip.to_string())
|
||||
.or_insert(BlockIpData {
|
||||
ipdata: ipevent.ipdata.clone(),
|
||||
tryfail: set.tryfail,
|
||||
starttime,
|
||||
blocktime,
|
||||
});
|
||||
}
|
||||
None => {}
|
||||
},
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
None
|
||||
@ -365,12 +362,12 @@ impl Config {
|
||||
.send()
|
||||
.await;
|
||||
let req = match resp {
|
||||
Ok(o) => o,
|
||||
Err(e) => return Err(e),
|
||||
Ok(re) => re,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
let data: GlobalConfigV2 = match req.json::<GlobalConfigV2>().await {
|
||||
Ok(o) => o,
|
||||
Err(e) => return Err(e),
|
||||
Ok(res) => res,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
for d in data.sets {
|
||||
@ -400,13 +397,13 @@ impl Config {
|
||||
.await;
|
||||
|
||||
let req = match resp {
|
||||
Ok(o) => o,
|
||||
Err(e) => return Err(e),
|
||||
Ok(re) => re,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
|
||||
Ok(o) => o,
|
||||
Err(e) => return Err(e),
|
||||
Ok(res) => res,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
Ok(data)
|
||||
@ -417,8 +414,8 @@ impl Config {
|
||||
for trustnet in &self.trustnets {
|
||||
match trustnet.parse() {
|
||||
Ok(net) => trustnets.push(net),
|
||||
Err(e) => {
|
||||
println!("error parsing {trustnet}, error: {e}");
|
||||
Err(err) => {
|
||||
println!("error parsing {trustnet}, error: {err}");
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -430,7 +427,13 @@ impl Config {
|
||||
msgtype: String::from("bootstrap"),
|
||||
mode: String::from("ws"),
|
||||
hostname: gethostname(true),
|
||||
ipdata: None,
|
||||
ipdata: IpData {
|
||||
t: 4,
|
||||
ip: "".to_string(),
|
||||
src: "".to_string(),
|
||||
date: "".to_string(),
|
||||
hostname: "".to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -518,13 +521,13 @@ mod test {
|
||||
msgtype: String::from("add"),
|
||||
mode: String::from("ws"),
|
||||
hostname: String::from("localhost"),
|
||||
ipdata: Some(IpData {
|
||||
ipdata: IpData {
|
||||
t: 4,
|
||||
ip: "1.1.1.1".to_string(),
|
||||
hostname: "test1".to_string(),
|
||||
date: now.to_rfc3339().to_string(),
|
||||
src: "ssh".to_string(),
|
||||
}),
|
||||
},
|
||||
})
|
||||
.await;
|
||||
}
|
||||
@ -534,13 +537,13 @@ mod test {
|
||||
msgtype: String::from("add"),
|
||||
mode: String::from("ws"),
|
||||
hostname: String::from("localhost"),
|
||||
ipdata: Some(IpData {
|
||||
ipdata: IpData {
|
||||
t: 4,
|
||||
ip: "1.1.1.2".to_string(),
|
||||
hostname: "test2".to_string(),
|
||||
date: now.to_rfc3339().to_string(),
|
||||
src: "http".to_string(),
|
||||
}),
|
||||
},
|
||||
})
|
||||
.await;
|
||||
}
|
||||
@ -549,13 +552,13 @@ mod test {
|
||||
msgtype: String::from("add"),
|
||||
mode: String::from("ws"),
|
||||
hostname: String::from("localhost"),
|
||||
ipdata: Some(IpData {
|
||||
ipdata: IpData {
|
||||
t: 4,
|
||||
ip: "1.1.1.3".to_string(),
|
||||
hostname: "testgood".to_string(),
|
||||
date: now.to_rfc3339().to_string(),
|
||||
src: "http".to_string(),
|
||||
}),
|
||||
},
|
||||
})
|
||||
.await;
|
||||
|
||||
@ -563,13 +566,13 @@ mod test {
|
||||
msgtype: String::from("add"),
|
||||
mode: String::from("ws"),
|
||||
hostname: String::from("localhost"),
|
||||
ipdata: Some(IpData {
|
||||
ipdata: IpData {
|
||||
t: 4,
|
||||
ip: "1.1.1.4".to_string(),
|
||||
hostname: "testgood".to_string(),
|
||||
date: now.to_rfc3339().to_string(),
|
||||
src: "http".to_string(),
|
||||
}),
|
||||
},
|
||||
})
|
||||
.await;
|
||||
|
||||
@ -577,26 +580,26 @@ mod test {
|
||||
msgtype: String::from("add"),
|
||||
mode: String::from("ws"),
|
||||
hostname: String::from("localhost"),
|
||||
ipdata: Some(IpData {
|
||||
ipdata: IpData {
|
||||
t: 4,
|
||||
ip: "1.1.1.4".to_string(),
|
||||
hostname: "testgood".to_string(),
|
||||
date: now.to_rfc3339().to_string(),
|
||||
src: "http".to_string(),
|
||||
}),
|
||||
},
|
||||
})
|
||||
.await;
|
||||
ctx.update_blocklist(&mut IpEvent {
|
||||
msgtype: String::from("add"),
|
||||
mode: String::from("ws"),
|
||||
hostname: String::from("localhost"),
|
||||
ipdata: Some(IpData {
|
||||
ipdata: IpData {
|
||||
t: 6,
|
||||
ip: "2a00:1450:4007:805::2003".to_string(),
|
||||
hostname: "testgood".to_string(),
|
||||
date: now.to_rfc3339().to_string(),
|
||||
src: "http".to_string(),
|
||||
}),
|
||||
},
|
||||
})
|
||||
.await;
|
||||
|
||||
|
54
src/fw.rs
54
src/fw.rs
@ -21,6 +21,7 @@ pub fn fwglobalinit<'a>() -> ((Batch, Table), (Batch, Table)) {
|
||||
|
||||
macro_rules! initrules {
|
||||
($batch:expr, $table:expr, $chain:ident) => {
|
||||
let mut $chain = Chain::new(&CString::new(PKG_NAME).unwrap(), &$table);
|
||||
$chain.set_hook(nftnl::Hook::In, 1);
|
||||
$chain.set_policy(nftnl::Policy::Accept);
|
||||
|
||||
@ -28,11 +29,13 @@ macro_rules! initrules {
|
||||
$batch.add(&Rule::new(&$chain), nftnl::MsgType::Del);
|
||||
|
||||
let mut rule = Rule::new(&$chain);
|
||||
|
||||
rule.add_expr(&nft_expr!(ct state));
|
||||
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
|
||||
rule.add_expr(&nft_expr!(cmp != 0u32));
|
||||
rule.add_expr(&nft_expr!(counter));
|
||||
rule.add_expr(&nft_expr!(verdict accept));
|
||||
|
||||
$batch.add(&rule, nftnl::MsgType::Add);
|
||||
};
|
||||
}
|
||||
@ -83,62 +86,37 @@ fn fwinit(t: FwTableType) -> (Batch, Table) {
|
||||
}
|
||||
|
||||
pub fn fwblock(
|
||||
ips_add_all: &Vec<IpData>,
|
||||
ips_add: &Vec<IpData>,
|
||||
ret: &mut Vec<String>,
|
||||
fwlen: &mut usize,
|
||||
) -> std::result::Result<(), Error> {
|
||||
let ((mut batch4, table4), (mut batch6, table6)) = fwglobalinit();
|
||||
|
||||
let mut chain4 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table4);
|
||||
let mut chain6 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table6);
|
||||
|
||||
initrules!(batch4, table4, chain4);
|
||||
initrules!(batch6, table6, chain6);
|
||||
|
||||
let mut factor = 1;
|
||||
if ips_add_all.len() > 100 {
|
||||
factor = (ips_add_all.len() / 10) as usize
|
||||
}
|
||||
|
||||
let ips_add_tmp: Vec<IpData> = ips_add_all.clone().iter().map(|x| x.clone()).collect();
|
||||
let mut ips_add_iter = ips_add_tmp.chunks(factor);
|
||||
let mut ips_add: Vec<&[IpData]> = vec![];
|
||||
while let Some(x) = ips_add_iter.next() {
|
||||
ips_add.push(x);
|
||||
}
|
||||
|
||||
// build and add rules
|
||||
for ipdata_group in ips_add.clone() {
|
||||
for ipdata in ipdata_group {
|
||||
match ipdata.t {
|
||||
4 => {
|
||||
createrules!(ipdata, chain4, batch4, Ipv4Addr, ipv4);
|
||||
}
|
||||
6 => {
|
||||
createrules!(ipdata, chain6, batch6, Ipv6Addr, ipv6);
|
||||
}
|
||||
_ => {}
|
||||
for ipdata in ips_add.clone() {
|
||||
match ipdata.t {
|
||||
4 => {
|
||||
createrules!(ipdata, chain4, batch4, Ipv4Addr, ipv4);
|
||||
}
|
||||
6 => {
|
||||
createrules!(ipdata, chain6, batch6, Ipv6Addr, ipv6);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// validate and send batch
|
||||
for b in [batch4, batch6] {
|
||||
let bf = b.finalize();
|
||||
match send_and_process(&bf) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("error sending batch: {e}");
|
||||
}
|
||||
};
|
||||
send_and_process(&bf).unwrap();
|
||||
}
|
||||
if fwlen != &mut ips_add_all.len() {
|
||||
ret.push(format!(
|
||||
"{length} ip in firewall",
|
||||
length = ips_add_all.len()
|
||||
));
|
||||
if fwlen != &mut ips_add.len() {
|
||||
ret.push(format!("{length} ip in firewall", length = ips_add.len()));
|
||||
}
|
||||
*fwlen = ips_add_all.len();
|
||||
*fwlen = ips_add.len();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
10
src/ip.rs
10
src/ip.rs
@ -22,7 +22,7 @@ pub struct IpEvent {
|
||||
pub msgtype: String,
|
||||
pub mode: String,
|
||||
pub hostname: String,
|
||||
pub ipdata: Option<IpData>,
|
||||
pub ipdata: IpData,
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
@ -35,14 +35,6 @@ macro_rules! ipevent {
|
||||
ipdata: $ipdata,
|
||||
}
|
||||
};
|
||||
($msgtype:expr,$mode:expr,$hostname:expr) => {
|
||||
IpEvent {
|
||||
msgtype: String::from($msgtype),
|
||||
mode: String::from($mode),
|
||||
hostname: $hostname,
|
||||
ipdata: None,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
|
63
src/ipblc.rs
63
src/ipblc.rs
@ -11,7 +11,6 @@ use chrono::prelude::*;
|
||||
use chrono::prelude::{DateTime, Local};
|
||||
use chrono::Duration;
|
||||
use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent};
|
||||
use sd_notify::*;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::{channel, Receiver, Sender};
|
||||
@ -22,13 +21,6 @@ const BL_CHAN_SIZE: usize = 32;
|
||||
const WS_CHAN_SIZE: usize = 64;
|
||||
const LOOP_MAX_WAIT: u64 = 5;
|
||||
|
||||
macro_rules! log_with_systemd {
|
||||
($msg:expr) => {
|
||||
println!("{}", $msg);
|
||||
notify(false, &[NotifyState::Status(format!("{}", $msg).as_str())]).unwrap();
|
||||
};
|
||||
}
|
||||
|
||||
pub async fn run() {
|
||||
let inotify = Inotify::init(InitFlags::empty()).unwrap();
|
||||
let globalctx = Context::new(&inotify).await;
|
||||
@ -39,7 +31,7 @@ pub async fn run() {
|
||||
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
|
||||
|
||||
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
|
||||
log_with_systemd!(format!("Launching {}, version {}", PKG_NAME, pkgversion));
|
||||
println!("Launching {}, version {}", PKG_NAME, pkgversion);
|
||||
fwglobalinit();
|
||||
|
||||
let ctxapi = Arc::clone(&ctxarc);
|
||||
@ -68,8 +60,6 @@ pub async fn run() {
|
||||
compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await;
|
||||
});
|
||||
|
||||
notify(false, &[NotifyState::Ready]).unwrap();
|
||||
|
||||
loop {
|
||||
let mut ret: Vec<String> = Vec::new();
|
||||
|
||||
@ -86,9 +76,8 @@ pub async fn run() {
|
||||
|
||||
if received_ip.msgtype == "bootstrap".to_string() {
|
||||
for ip_to_send in toblock {
|
||||
let ipe = ipevent!("init","ws",gethostname(true),Some(ip_to_send));
|
||||
let ipe = ipevent!("init","ws",gethostname(true),ip_to_send);
|
||||
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
|
||||
wssocketrr.close(None).unwrap();
|
||||
wssocketrr = websocketreqrep(&ctxwsrr).await;
|
||||
break;
|
||||
}
|
||||
@ -104,24 +93,18 @@ pub async fn run() {
|
||||
// send ip list to api and ws sockets
|
||||
if let Some(ipevent) = filtered_ipevent {
|
||||
if received_ip.msgtype != "init" {
|
||||
log_with_systemd!(format!("sending {} to api and ws", ipevent.ipdata.clone().unwrap().ip));
|
||||
println!("sending {} to api and ws", ipevent.ipdata.ip);
|
||||
let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata);
|
||||
send_to_ipbl_api(&server.clone(), &ipe).await;
|
||||
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
|
||||
wssocketrr.close(None).unwrap();
|
||||
let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
|
||||
if !status {
|
||||
wssocketrr = websocketreqrep(&ctxwsrr).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_val = sleep_s(LOOP_MAX_WAIT) => {
|
||||
let ipe = ipevent!("ping", "ws", gethostname(true));
|
||||
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
|
||||
wssocketrr.close(None).unwrap();
|
||||
wssocketrr = websocketreqrep(&ctxwsrr).await;
|
||||
}
|
||||
}
|
||||
_val = sleep_s(LOOP_MAX_WAIT) => {}
|
||||
};
|
||||
|
||||
let ctxclone = Arc::clone(&ctxarc);
|
||||
@ -129,8 +112,7 @@ pub async fn run() {
|
||||
|
||||
// log lines
|
||||
if ret.len() > 0 {
|
||||
let result = ret.join(", ");
|
||||
log_with_systemd!(format!("{result}"));
|
||||
println!("{ret}", ret = ret.join(", "));
|
||||
}
|
||||
|
||||
let ctxclone = Arc::clone(&ctxarc);
|
||||
@ -146,26 +128,8 @@ async fn handle_cfg_reload(
|
||||
) {
|
||||
let now_cfg_reload = Local::now().trunc_subsecs(0);
|
||||
if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) {
|
||||
let inotify;
|
||||
loop {
|
||||
inotify = match inoarc.try_read() {
|
||||
Ok(o) => o,
|
||||
Err(e) => {
|
||||
println!("{e}");
|
||||
sleep_s(1).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
break;
|
||||
}
|
||||
let mut ctxtest = match ctxclone.try_write() {
|
||||
Ok(o) => o,
|
||||
Err(e) => {
|
||||
println!("{e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
match ctxtest.load(&inotify).await {
|
||||
let inotify = inoarc.read().await;
|
||||
match ctxclone.write().await.load(&inotify).await {
|
||||
Ok(_) => {
|
||||
*last_cfg_reload = Local::now().trunc_subsecs(0);
|
||||
}
|
||||
@ -189,8 +153,8 @@ async fn handle_fwblock(ctxclone: Arc<RwLock<Context>>, ret: &mut Vec<String>, f
|
||||
// apply firewall blocking
|
||||
match fwblock(&toblock, ret, fwlen) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("err: {e}, unable to push firewall rules, use super user")
|
||||
Err(err) => {
|
||||
println!("Err: {err}, unable to push firewall rules, use super user")
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -241,7 +205,8 @@ async fn compare_files_changes(
|
||||
match modfiles.inevent.name {
|
||||
Some(name) => {
|
||||
let filename = name.to_str().unwrap();
|
||||
for (sak, sa) in sas.clone().iter_mut() {
|
||||
for sak in sas.clone().keys() {
|
||||
let sa = sas.get(sak).unwrap();
|
||||
if modfiles.inevent.wd == sa.wd {
|
||||
let handle: String;
|
||||
if sa.filename.as_str() == "" {
|
||||
@ -279,7 +244,7 @@ async fn compare_files_changes(
|
||||
}
|
||||
}
|
||||
for ip in iplist {
|
||||
let ipe = ipevent!("add", "file", gethostname(true), Some(ip));
|
||||
let ipe = ipevent!("add", "file", gethostname(true), ip);
|
||||
let ipetx = ipeventtx.read().await;
|
||||
ipetx.send(ipe).await.unwrap();
|
||||
}
|
||||
|
@ -4,42 +4,41 @@ use serde_json;
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::TcpSocket;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
|
||||
let ctxarc = ctxarc.clone();
|
||||
let addr: String = { ctxarc.read().await.cfg.api.parse().unwrap() };
|
||||
let listener = match TcpListener::bind(addr).await {
|
||||
Ok(o) => o,
|
||||
Err(e) => {
|
||||
println!("error: {e}");
|
||||
let addr = { ctxarc.read().await.cfg.api.parse().unwrap() };
|
||||
|
||||
let socket = TcpSocket::new_v4().unwrap();
|
||||
match socket.bind(addr) {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
println!("can't bind monitoring socket, exiting...");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
}
|
||||
socket.set_reuseaddr(true).unwrap();
|
||||
socket
|
||||
.set_linger(Some(std::time::Duration::from_secs(0)))
|
||||
.unwrap();
|
||||
let listener = socket.listen(128).unwrap();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((mut socket, _addr)) => {
|
||||
let mut buf = vec![0; 1024];
|
||||
Ok((stream, _addr)) => {
|
||||
stream.readable().await.unwrap();
|
||||
let (reader, mut writer) = stream.into_split();
|
||||
let mut buf: [u8; 16] = [0; 16];
|
||||
|
||||
match socket.readable().await {
|
||||
Ok(_) => {
|
||||
match socket.try_read(&mut buf) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("error: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
match reader.try_read(&mut buf) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("error: {e}");
|
||||
continue;
|
||||
println!("error: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
let msg = match String::from_utf8(buf.to_vec()) {
|
||||
Ok(o) => o.trim_matches(char::from(0)).trim().to_string(),
|
||||
Err(_) => "".to_string(),
|
||||
@ -47,15 +46,15 @@ pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
|
||||
|
||||
let res = format_result(&ctxarc, msg.as_str()).await;
|
||||
|
||||
match socket.write_all(res.as_bytes()).await {
|
||||
match writer.write_all(format!("{res}").as_bytes()).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("error: {e}");
|
||||
Err(err) => {
|
||||
println!("ee {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
println!("error: {err}");
|
||||
println!("unable to serialize data: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
11
src/utils.rs
11
src/utils.rs
@ -6,9 +6,9 @@ use tokio::time::{sleep, Duration};
|
||||
|
||||
pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
|
||||
let mut file = match File::open(filename) {
|
||||
Ok(o) => o,
|
||||
Err(e) => {
|
||||
println!("error: {e}");
|
||||
Ok(f) => f,
|
||||
Err(err) => {
|
||||
println!("{err}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
@ -21,11 +21,6 @@ pub async fn sleep_s(s: u64) {
|
||||
sleep(Duration::from_secs(s)).await;
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn sleep_ms(m: u64) {
|
||||
sleep(Duration::from_millis(m)).await;
|
||||
}
|
||||
|
||||
pub fn gethostname(show_fqdn: bool) -> String {
|
||||
let hostname_cstr = unistd::gethostname().expect("Failed getting hostname");
|
||||
let fqdn = hostname_cstr
|
||||
|
@ -11,12 +11,12 @@ pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
|
||||
let mut try_req = 0;
|
||||
let client = httpclient();
|
||||
loop {
|
||||
match push_ip(&client, &server, &ip.ipdata.clone().unwrap()).await {
|
||||
match push_ip(&client, &server, &ip.ipdata).await {
|
||||
Ok(_) => {
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
println!("error: {e}");
|
||||
Err(err) => {
|
||||
println!("{err}");
|
||||
sleep_s(1).await;
|
||||
if try_req == MAX_FAILED_API_RATE {
|
||||
break;
|
||||
|
@ -14,13 +14,13 @@ use tungstenite::*;
|
||||
pub async fn websocketreqrep(
|
||||
ctxarc: &Arc<RwLock<Context>>,
|
||||
) -> WebSocket<MaybeTlsStream<TcpStream>> {
|
||||
let (mut wssocketrr, bootstrap_event, wscfg);
|
||||
let (mut wssocketrr, bootstrap_event, cfg);
|
||||
{
|
||||
let ctx = ctxarc.read().await;
|
||||
bootstrap_event = ctx.cfg.bootstrap_event().clone();
|
||||
wscfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
|
||||
cfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
|
||||
}
|
||||
wssocketrr = websocketconnect(&wscfg, &gethostname(true)).await.unwrap();
|
||||
wssocketrr = websocketconnect(&cfg, &gethostname(true)).await.unwrap();
|
||||
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
|
||||
|
||||
return wssocketrr;
|
||||
@ -45,24 +45,15 @@ pub async fn websocketpubsub(
|
||||
Ok(msg) => {
|
||||
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
|
||||
Ok(o) => o,
|
||||
Err(e) => {
|
||||
println!("error in pubsub: {e:?}");
|
||||
Err(_e) => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match tosend.ipdata.clone() {
|
||||
Some(o) => {
|
||||
if o.hostname != gethostname(true)
|
||||
|| tosend.msgtype == "init".to_string()
|
||||
{
|
||||
let txps = txpubsub.read().await;
|
||||
txps.send(tosend).await.unwrap();
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let txps = txpubsub.read().await;
|
||||
txps.send(tosend.clone()).await.unwrap();
|
||||
}
|
||||
if tosend.ipdata.hostname != gethostname(true)
|
||||
|| tosend.msgtype == "init".to_string()
|
||||
{
|
||||
let txps = txpubsub.read().await;
|
||||
txps.send(tosend).await.unwrap();
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@ -114,12 +105,11 @@ pub async fn send_to_ipbl_websocket(
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("err send read: {e:?}");
|
||||
return false;
|
||||
return handle_websocket_error(ws);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
println!("can't write to socket");
|
||||
return false;
|
||||
return handle_websocket_error(ws);
|
||||
};
|
||||
|
||||
if ws.can_read() {
|
||||
@ -127,13 +117,16 @@ pub async fn send_to_ipbl_websocket(
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("err send read: {e:?}");
|
||||
return false;
|
||||
return handle_websocket_error(ws);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
println!("can't read from socket");
|
||||
return false;
|
||||
return handle_websocket_error(ws);
|
||||
};
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
fn handle_websocket_error(ws: &mut WebSocket<MaybeTlsStream<TcpStream>>) -> bool {
|
||||
ws.close(None).unwrap();
|
||||
return false;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user