Compare commits

..

No commits in common. "master" and "1.5.1" have entirely different histories.

13 changed files with 697 additions and 883 deletions

View File

@ -20,8 +20,7 @@ steps:
- cargo test --verbose --all - cargo test --verbose --all
environment: environment:
RUSTC_WRAPPER: /usr/bin/sccache RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com SCCACHE_REDIS: redis://sys01.paulbsd.com:6379/1
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -44,8 +43,7 @@ steps:
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc - tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc
environment: environment:
RUSTC_WRAPPER: /usr/bin/sccache RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com SCCACHE_REDIS: redis://sys01.paulbsd.com:6379/1
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -99,8 +97,7 @@ steps:
- cargo test --verbose --all - cargo test --verbose --all
environment: environment:
RUSTC_WRAPPER: /usr/bin/sccache RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com SCCACHE_REDIS: redis://sys01.paulbsd.com:6379/1
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -123,8 +120,7 @@ steps:
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc - tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc
environment: environment:
RUSTC_WRAPPER: /usr/bin/sccache RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com SCCACHE_REDIS: redis://sys01.paulbsd.com:6379/1
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry

831
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package] [package]
name = "ipblc" name = "ipblc"
version = "1.7.0" version = "1.5.0"
edition = "2021" edition = "2021"
authors = ["PaulBSD <paul@paulbsd.com>"] authors = ["PaulBSD <paul@paulbsd.com>"]
description = "ipblc is a tool that search and send attacking ip addresses to ipbl" description = "ipblc is a tool that search and send attacking ip addresses to ipbl"
@ -21,9 +21,8 @@ regex = "1.10"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sd-notify = { version = "0.4" } tokio = { version = "1.33", features = ["full", "sync"] }
tokio = { version = "1.35", features = ["full", "sync"] } tungstenite = { version = "0.20", features = ["handshake", "rustls-tls-native-roots"] }
tungstenite = { version = "0.21", features = ["handshake", "rustls-tls-native-roots"] }
## to optimize binary size (slow compile time) ## to optimize binary size (slow compile time)
#[profile.release] #[profile.release]

View File

@ -48,7 +48,6 @@ Options:
- ✅ Local bound tcp api socket - ✅ Local bound tcp api socket
- ✅ ZMQ -> Websocket - ✅ ZMQ -> Websocket
- ✅ Bug in RwLocks (agent often give up) - ✅ Bug in RwLocks (agent often give up)
- ❌ Create memory friendly structs for ipdata
### Notes ### Notes

View File

@ -17,7 +17,7 @@ use std::path::Path;
pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]); pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]);
const MASTERSERVER: &str = "ipbl.paulbsd.com"; const MASTERSERVER: &str = "ipbl.paulbsd.com";
const WSSUBSCRIPTION: &str = "ipbl"; const WSSUBSCRIPTION: &str = "ipbl";
const CONFIG_RETRY_INTERVAL: u64 = 2; const CONFIG_RETRY: u64 = 2;
const WEB_CLIENT_TIMEOUT: i64 = 5; const WEB_CLIENT_TIMEOUT: i64 = 5;
#[derive(Debug)] #[derive(Debug)]
@ -28,7 +28,6 @@ pub struct Context {
pub flags: Flags, pub flags: Flags,
pub sas: HashMap<String, SetMap>, pub sas: HashMap<String, SetMap>,
pub hashwd: HashMap<String, WatchDescriptor>, pub hashwd: HashMap<String, WatchDescriptor>,
pub reloadinterval: isize,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -36,7 +35,7 @@ pub struct SetMap {
pub filename: String, pub filename: String,
pub fullpath: String, pub fullpath: String,
pub regex: Regex, pub regex: Regex,
pub set: SetCfg, pub set: Set,
pub watchedfiles: HashMap<String, u64>, pub watchedfiles: HashMap<String, u64>,
pub wd: WatchDescriptor, pub wd: WatchDescriptor,
} }
@ -65,7 +64,6 @@ impl Context {
sas: HashMap::new(), sas: HashMap::new(),
blocklist: HashMap::new(), blocklist: HashMap::new(),
hashwd: HashMap::new(), hashwd: HashMap::new(),
reloadinterval: 5,
}; };
print!("Loading config ... "); print!("Loading config ... ");
@ -103,12 +101,12 @@ impl Context {
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: Discovery = match req.json().await { let data: Discovery = match req.json().await {
Ok(o) => o, Ok(res) => res,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
Ok(data) Ok(data)
} }
@ -127,10 +125,10 @@ impl Context {
} }
break; break;
} }
Err(e) => { Err(err) => {
println!("error loading config: {e}, retrying in {CONFIG_RETRY_INTERVAL}s"); println!("error loading config: {err}, retrying in {CONFIG_RETRY}s");
last_in_err = true; last_in_err = true;
sleep_s(CONFIG_RETRY_INTERVAL).await; sleep_s(CONFIG_RETRY).await;
} }
}; };
} }
@ -169,41 +167,38 @@ impl Context {
} }
pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> { pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> {
match &ipevent.ipdata { match self.cfg.sets.get(&ipevent.ipdata.src) {
Some(ipdata) => match self.cfg.sets.get(&ipdata.src) { Some(set) => {
Some(set) => { let starttime = DateTime::parse_from_rfc3339(ipevent.ipdata.date.as_str())
let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str()) .unwrap()
.unwrap() .with_timezone(&chrono::Local);
.with_timezone(&chrono::Local); let blocktime = set.blocktime;
let blocktime = set.blocktime; if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname { let block = self
let block = .blocklist
self.blocklist .entry(ipevent.ipdata.ip.to_string())
.entry(ipdata.ip.to_string()) .or_insert(BlockIpData {
.or_insert(BlockIpData { ipdata: ipevent.ipdata.clone(),
ipdata: ipdata.clone(), tryfail: 0,
tryfail: 0, starttime,
starttime, blocktime,
blocktime, });
}); block.tryfail += 1;
block.tryfail += 1; block.blocktime = blocktime;
block.blocktime = blocktime; if block.tryfail >= set.tryfail {
if block.tryfail >= set.tryfail { return Some(ipevent.clone());
return Some(ipevent.clone());
}
} else {
self.blocklist
.entry(ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipdata.clone(),
tryfail: set.tryfail,
starttime,
blocktime,
});
} }
} else {
self.blocklist
.entry(ipevent.ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipevent.ipdata.clone(),
tryfail: set.tryfail,
starttime,
blocktime,
});
} }
None => {} }
},
None => {} None => {}
} }
None None
@ -287,7 +282,7 @@ impl Context {
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Config { pub struct Config {
pub sets: HashMap<String, SetCfg>, pub sets: HashMap<String, Set>,
#[serde(skip_serializing)] #[serde(skip_serializing)]
pub trustnets: Vec<String>, pub trustnets: Vec<String>,
pub ws: HashMap<String, WebSocketCfg>, pub ws: HashMap<String, WebSocketCfg>,
@ -299,7 +294,7 @@ impl Config {
Self { Self {
sets: HashMap::from([ sets: HashMap::from([
("smtp".to_string(), ("smtp".to_string(),
SetCfg { Set {
src: "smtp".to_string(), src: "smtp".to_string(),
filename: "mail.log".to_string(), filename: "mail.log".to_string(),
regex: "(SASL LOGIN authentication failed)".to_string(), regex: "(SASL LOGIN authentication failed)".to_string(),
@ -308,7 +303,7 @@ impl Config {
tryfail: 5, tryfail: 5,
}), }),
("ssh".to_string(), ("ssh".to_string(),
SetCfg { Set {
src: "ssh".to_string(), src: "ssh".to_string(),
filename: "auth.log".to_string(), filename: "auth.log".to_string(),
regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(), regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(),
@ -317,7 +312,7 @@ impl Config {
tryfail: 5, tryfail: 5,
},), },),
("http".to_string(), ("http".to_string(),
SetCfg { Set {
src: "http".to_string(), src: "http".to_string(),
filename: "".to_string(), filename: "".to_string(),
regex: "(anonymousfox.co)".to_string(), regex: "(anonymousfox.co)".to_string(),
@ -326,7 +321,7 @@ impl Config {
tryfail: 5, tryfail: 5,
},), },),
("openvpn".to_string(), ("openvpn".to_string(),
SetCfg { Set {
src: "openvpn".to_string(), src: "openvpn".to_string(),
filename: "status".to_string(), filename: "status".to_string(),
regex: "(UNDEF)".to_string(), regex: "(UNDEF)".to_string(),
@ -355,40 +350,96 @@ impl Config {
} }
pub async fn load(&mut self, server: &String) -> Result<(), ReqError> { pub async fn load(&mut self, server: &String) -> Result<(), ReqError> {
self.get_config(server).await?; self.get_global_config(server).await?;
self.get_trustnets(server).await?;
self.get_sets(server).await?;
self.get_ws_config(server).await?;
Ok(()) Ok(())
} }
async fn get_config(&mut self, server: &String) -> Result<(), ReqError> { async fn get_global_config(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> =
httpclient().get(format!("{server}/config")).send().await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: HashMap<String, GlobalConfig> = match req.json::<Vec<GlobalConfig>>().await {
Ok(res) => {
let mut out: HashMap<String, GlobalConfig> = HashMap::new();
res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.key.to_string(), x);
});
out
}
Err(err) => return Err(err),
};
let key = "".to_string();
self.api = data
.get(&key.to_string())
.unwrap_or(&GlobalConfig {
key: "api".to_string(),
value: "127.0.0.1:8060".to_string(),
})
.value
.clone();
Ok(())
}
async fn get_trustnets(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = httpclient() let resp: Result<Response, ReqError> = httpclient()
.get(format!("{server}/config?v=2")) .get(format!("{server}/config/trustlist"))
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: GlobalConfigV2 = match req.json::<GlobalConfigV2>().await { let data: Vec<String> = match req.json::<Vec<String>>().await {
Ok(o) => o, Ok(res) => res,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
self.trustnets = data;
Ok(())
}
for d in data.sets { async fn get_sets(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = httpclient()
.get(format!("{server}/config/sets"))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<Set> = match req.json::<Vec<Set>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
for d in data {
self.sets.insert(d.src.clone(), d); self.sets.insert(d.src.clone(), d);
} }
Ok(())
}
self.trustnets = data.trustlists; async fn get_ws_config(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> =
data.ws.into_iter().map(|x| x).for_each(|x| { httpclient().get(format!("{server}/config/ws")).send().await;
self.ws.insert(x.t.to_string(), x); let req = match resp {
}); Ok(re) => re,
Err(err) => return Err(err),
self.api = data };
.cfg let data: HashMap<String, WebSocketCfg> = match req.json::<Vec<WebSocketCfg>>().await {
.get(&"api".to_string()) Ok(res) => {
.unwrap_or(&self.api) let mut out: HashMap<String, WebSocketCfg> = HashMap::new();
.clone(); res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.t.to_string(), x);
});
out
}
Err(err) => return Err(err),
};
self.ws = data;
Ok(()) Ok(())
} }
@ -400,13 +451,13 @@ impl Config {
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await { let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
Ok(o) => o, Ok(res) => res,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
Ok(data) Ok(data)
@ -417,8 +468,8 @@ impl Config {
for trustnet in &self.trustnets { for trustnet in &self.trustnets {
match trustnet.parse() { match trustnet.parse() {
Ok(net) => trustnets.push(net), Ok(net) => trustnets.push(net),
Err(e) => { Err(err) => {
println!("error parsing {trustnet}, error: {e}"); println!("error parsing {trustnet}, error: {err}");
} }
}; };
} }
@ -430,7 +481,13 @@ impl Config {
msgtype: String::from("bootstrap"), msgtype: String::from("bootstrap"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: gethostname(true), hostname: gethostname(true),
ipdata: None, ipdata: IpData {
t: 4,
ip: "".to_string(),
src: "".to_string(),
date: "".to_string(),
hostname: "".to_string(),
},
} }
} }
} }
@ -451,15 +508,13 @@ pub fn httpclient() -> Client {
} }
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct GlobalConfigV2 { pub struct GlobalConfig {
pub cfg: HashMap<String, String>, pub key: String,
pub sets: Vec<SetCfg>, pub value: String,
pub trustlists: Vec<String>,
pub ws: Vec<WebSocketCfg>,
} }
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct SetCfg { pub struct Set {
pub src: String, pub src: String,
pub filename: String, pub filename: String,
pub regex: String, pub regex: String,
@ -488,13 +543,13 @@ pub struct URL {
pub path: String, pub path: String,
} }
impl PartialEq for SetCfg { impl PartialEq for Set {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.src == other.src self.src == other.src
} }
} }
impl Hash for SetCfg { impl Hash for Set {
fn hash<H: Hasher>(&self, state: &mut H) { fn hash<H: Hasher>(&self, state: &mut H) {
self.src.hash(state); self.src.hash(state);
} }
@ -518,13 +573,13 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4, t: 4,
ip: "1.1.1.1".to_string(), ip: "1.1.1.1".to_string(),
hostname: "test1".to_string(), hostname: "test1".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "ssh".to_string(), src: "ssh".to_string(),
}), },
}) })
.await; .await;
} }
@ -534,13 +589,13 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4, t: 4,
ip: "1.1.1.2".to_string(), ip: "1.1.1.2".to_string(),
hostname: "test2".to_string(), hostname: "test2".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
} }
@ -549,13 +604,13 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4, t: 4,
ip: "1.1.1.3".to_string(), ip: "1.1.1.3".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
@ -563,13 +618,13 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4, t: 4,
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
@ -577,26 +632,26 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4, t: 4,
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 6, t: 6,
ip: "2a00:1450:4007:805::2003".to_string(), ip: "2a00:1450:4007:805::2003".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;

133
src/fw.rs
View File

@ -19,41 +19,6 @@ pub fn fwglobalinit<'a>() -> ((Batch, Table), (Batch, Table)) {
((batch4, table4), (batch6, table6)) ((batch4, table4), (batch6, table6))
} }
macro_rules! initrules {
($batch:expr, $table:expr, $chain:ident) => {
$chain.set_hook(nftnl::Hook::In, 1);
$chain.set_policy(nftnl::Policy::Accept);
$batch.add(&$chain, nftnl::MsgType::Add);
$batch.add(&Rule::new(&$chain), nftnl::MsgType::Del);
let mut rule = Rule::new(&$chain);
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict accept));
$batch.add(&rule, nftnl::MsgType::Add);
};
}
macro_rules! createrules {
($ipdata:ident, $chain:ident, $batch:ident, $t:ty, $ip_t:ident) => {
let mut rule = Rule::new(&$chain);
let ip = $ipdata.ip.parse::<$t>().unwrap();
rule.add_expr(&nft_expr!(payload $ip_t saddr));
rule.add_expr(&nft_expr!(cmp == ip));
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 10u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict drop));
$batch.add(&rule, nftnl::MsgType::Add);
}
}
fn fwinit(t: FwTableType) -> (Batch, Table) { fn fwinit(t: FwTableType) -> (Batch, Table) {
let table_name: String; let table_name: String;
let table: Table; let table: Table;
@ -83,41 +48,77 @@ fn fwinit(t: FwTableType) -> (Batch, Table) {
} }
pub fn fwblock( pub fn fwblock(
ips_add_all: &Vec<IpData>, ips_add: &Vec<IpData>,
ret: &mut Vec<String>, ret: &mut Vec<String>,
fwlen: &mut usize, fwlen: &mut usize,
) -> std::result::Result<(), Error> { ) -> std::result::Result<(), Error> {
let ((mut batch4, table4), (mut batch6, table6)) = fwglobalinit(); let ((mut batch4, table4), (mut batch6, table6)) = fwglobalinit();
// build chain for ipv4
let mut chain4 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table4); let mut chain4 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table4);
chain4.set_hook(nftnl::Hook::In, 1);
chain4.set_policy(nftnl::Policy::Accept);
// add chain
batch4.add(&chain4, nftnl::MsgType::Add);
batch4.add(&Rule::new(&chain4), nftnl::MsgType::Del);
let mut rule4 = Rule::new(&chain4);
rule4.add_expr(&nft_expr!(ct state));
rule4.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule4.add_expr(&nft_expr!(cmp != 0u32));
rule4.add_expr(&nft_expr!(counter));
rule4.add_expr(&nft_expr!(verdict accept));
batch4.add(&rule4, nftnl::MsgType::Add);
// build chain for ipv6
let mut chain6 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table6); let mut chain6 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table6);
chain6.set_hook(nftnl::Hook::In, 1);
chain6.set_policy(nftnl::Policy::Accept);
initrules!(batch4, table4, chain4); // add chain
initrules!(batch6, table6, chain6); batch6.add(&chain6, nftnl::MsgType::Add);
let mut factor = 1; batch6.add(&Rule::new(&chain6), nftnl::MsgType::Del);
if ips_add_all.len() > 100 {
factor = (ips_add_all.len() / 10) as usize
}
let ips_add_tmp: Vec<IpData> = ips_add_all.clone().iter().map(|x| x.clone()).collect(); let mut rule6 = Rule::new(&chain6);
let mut ips_add_iter = ips_add_tmp.chunks(factor); rule6.add_expr(&nft_expr!(ct state));
let mut ips_add: Vec<&[IpData]> = vec![]; rule6.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
while let Some(x) = ips_add_iter.next() { rule6.add_expr(&nft_expr!(cmp != 0u32));
ips_add.push(x); rule6.add_expr(&nft_expr!(counter));
} rule6.add_expr(&nft_expr!(verdict accept));
batch6.add(&rule6, nftnl::MsgType::Add);
// build and add rules // build and add rules
for ipdata_group in ips_add.clone() { for ipdata in ips_add.clone() {
for ipdata in ipdata_group { match ipdata.t {
match ipdata.t { 4 => {
4 => { let ip = ipdata.ip.parse::<Ipv4Addr>().unwrap();
createrules!(ipdata, chain4, batch4, Ipv4Addr, ipv4); let mut rule = Rule::new(&chain4);
} rule.add_expr(&nft_expr!(payload ipv4 saddr));
6 => { rule.add_expr(&nft_expr!(cmp == ip));
createrules!(ipdata, chain6, batch6, Ipv6Addr, ipv6); rule.add_expr(&nft_expr!(ct state));
} rule.add_expr(&nft_expr!(bitwise mask 10u32, xor 0u32));
_ => {} rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict drop));
batch4.add(&rule, nftnl::MsgType::Add);
}
6 => {
let ip = ipdata.ip.parse::<Ipv6Addr>().unwrap();
let mut rule = Rule::new(&chain6);
rule.add_expr(&nft_expr!(payload ipv6 saddr));
rule.add_expr(&nft_expr!(cmp == ip));
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 10u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict drop));
batch6.add(&rule, nftnl::MsgType::Add);
}
_ => {
todo!()
} }
} }
} }
@ -125,20 +126,12 @@ pub fn fwblock(
// validate and send batch // validate and send batch
for b in [batch4, batch6] { for b in [batch4, batch6] {
let bf = b.finalize(); let bf = b.finalize();
match send_and_process(&bf) { send_and_process(&bf).unwrap();
Ok(_) => {}
Err(e) => {
println!("error sending batch: {e}");
}
};
} }
if fwlen != &mut ips_add_all.len() { if fwlen != &mut ips_add.len() {
ret.push(format!( ret.push(format!("{length} ip in firewall", length = ips_add.len()));
"{length} ip in firewall",
length = ips_add_all.len()
));
} }
*fwlen = ips_add_all.len(); *fwlen = ips_add.len();
Ok(()) Ok(())
} }

View File

@ -22,7 +22,7 @@ pub struct IpEvent {
pub msgtype: String, pub msgtype: String,
pub mode: String, pub mode: String,
pub hostname: String, pub hostname: String,
pub ipdata: Option<IpData>, pub ipdata: IpData,
} }
#[macro_export] #[macro_export]
@ -35,14 +35,6 @@ macro_rules! ipevent {
ipdata: $ipdata, ipdata: $ipdata,
} }
}; };
($msgtype:expr,$mode:expr,$hostname:expr) => {
IpEvent {
msgtype: String::from($msgtype),
mode: String::from($mode),
hostname: $hostname,
ipdata: None,
}
};
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -62,19 +54,6 @@ pub struct IpData {
pub hostname: String, pub hostname: String,
} }
#[macro_export]
macro_rules! ipdata {
($t:expr,$ip:expr,$src:expr,$date:expr,$hostname:expr) => {
IpData {
t: $t.clone(),
ip: $ip.clone(),
src: $src.clone(),
date: $date.clone(),
hostname: $hostname.clone(),
}
};
}
impl PartialEq for IpData { impl PartialEq for IpData {
fn eq(&self, other: &IpData) -> bool { fn eq(&self, other: &IpData) -> bool {
self.ip.as_bytes() == other.ip.as_bytes() && self.src == other.src self.ip.as_bytes() == other.ip.as_bytes() && self.src == other.src
@ -144,14 +123,6 @@ pub fn filter(
} }
}; };
let ipaddr: IpAddr = match s_ipaddr.parse() {
Ok(ip) => ip,
Err(err) => {
println!("unparseable IP: {err} {s_ipaddr}");
continue;
}
};
let s_date: DateTime<Local>; let s_date: DateTime<Local>;
match R_DATE.captures(l.as_str()) { match R_DATE.captures(l.as_str()) {
Some(sdt) => { Some(sdt) => {
@ -165,8 +136,22 @@ pub fn filter(
} }
}; };
let ipaddr: IpAddr = match s_ipaddr.parse() {
Ok(ip) => ip,
Err(err) => {
println!("unparseable IP: {err} {s_ipaddr}");
continue;
}
};
if !is_trusted(&ipaddr, &trustnets) { if !is_trusted(&ipaddr, &trustnets) {
iplist.push(ipdata!(t, s_ipaddr, src, s_date.to_rfc3339(), hostname)); iplist.push(IpData {
ip: s_ipaddr,
t: t,
src: src.to_owned(),
date: s_date.to_rfc3339().to_owned(),
hostname: hostname.to_owned(),
});
ips += 1; ips += 1;
}; };
} }

View File

@ -11,7 +11,6 @@ use chrono::prelude::*;
use chrono::prelude::{DateTime, Local}; use chrono::prelude::{DateTime, Local};
use chrono::Duration; use chrono::Duration;
use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent}; use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent};
use sd_notify::*;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
@ -22,13 +21,6 @@ const BL_CHAN_SIZE: usize = 32;
const WS_CHAN_SIZE: usize = 64; const WS_CHAN_SIZE: usize = 64;
const LOOP_MAX_WAIT: u64 = 5; const LOOP_MAX_WAIT: u64 = 5;
macro_rules! log_with_systemd {
($msg:expr) => {
println!("{}", $msg);
notify(false, &[NotifyState::Status(format!("{}", $msg).as_str())]).unwrap();
};
}
pub async fn run() { pub async fn run() {
let inotify = Inotify::init(InitFlags::empty()).unwrap(); let inotify = Inotify::init(InitFlags::empty()).unwrap();
let globalctx = Context::new(&inotify).await; let globalctx = Context::new(&inotify).await;
@ -39,7 +31,7 @@ pub async fn run() {
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0); let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
log_with_systemd!(format!("Launching {}, version {}", PKG_NAME, pkgversion)); println!("Launching {}, version {}", PKG_NAME, pkgversion);
fwglobalinit(); fwglobalinit();
let ctxapi = Arc::clone(&ctxarc); let ctxapi = Arc::clone(&ctxarc);
@ -68,8 +60,6 @@ pub async fn run() {
compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await; compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await;
}); });
notify(false, &[NotifyState::Ready]).unwrap();
loop { loop {
let mut ret: Vec<String> = Vec::new(); let mut ret: Vec<String> = Vec::new();
@ -79,16 +69,17 @@ pub async fn run() {
ipevent = ipeventrx.recv() => { ipevent = ipeventrx.recv() => {
let received_ip = ipevent.unwrap(); let received_ip = ipevent.unwrap();
let (toblock,server) = { let (toblock,server);
{
let ctx = ctxclone.read().await; let ctx = ctxclone.read().await;
(ctx.get_blocklist_toblock().await,ctx.flags.server.clone()) toblock = ctx.get_blocklist_toblock().await;
}; server = ctx.flags.server.clone();
}
if received_ip.msgtype == "bootstrap".to_string() { if received_ip.msgtype == "bootstrap".to_string() {
for ip_to_send in toblock { for ip_to_send in toblock {
let ipe = ipevent!("init","ws",gethostname(true),Some(ip_to_send)); let ipe = ipevent!("init","ws",gethostname(true),ip_to_send);
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await; wssocketrr = websocketreqrep(&ctxwsrr).await;
break; break;
} }
@ -97,31 +88,27 @@ pub async fn run() {
} }
// refresh context blocklist // refresh context blocklist
let filtered_ipevent = { let filtered_ipevent;
ctxarc.write().await.update_blocklist(&received_ip).await {
}; let mut ctx = ctxarc.write().await;
filtered_ipevent = ctx.update_blocklist(&received_ip).await;
}
// send ip list to api and ws sockets // send ip list to api and ws sockets
if let Some(ipevent) = filtered_ipevent { if let Some(ipevent) = filtered_ipevent {
if received_ip.msgtype != "init" { if received_ip.msgtype != "init" {
log_with_systemd!(format!("sending {} to api and ws", ipevent.ipdata.clone().unwrap().ip)); println!("sending {} to api and ws", ipevent.ipdata.ip);
let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata); let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata);
send_to_ipbl_api(&server.clone(), &ipe).await; send_to_ipbl_api(&server.clone(), &ipe).await;
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
wssocketrr.close(None).unwrap(); if !status {
wssocketrr = websocketreqrep(&ctxwsrr).await; wssocketrr = websocketreqrep(&ctxwsrr).await;
continue; continue;
} }
} }
} }
} }
_val = sleep_s(LOOP_MAX_WAIT) => { _val = sleep_s(LOOP_MAX_WAIT) => {}
let ipe = ipevent!("ping", "ws", gethostname(true));
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
}
}
}; };
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
@ -129,8 +116,7 @@ pub async fn run() {
// log lines // log lines
if ret.len() > 0 { if ret.len() > 0 {
let result = ret.join(", "); println!("{ret}", ret = ret.join(", "));
log_with_systemd!(format!("{result}"));
} }
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
@ -146,26 +132,9 @@ async fn handle_cfg_reload(
) { ) {
let now_cfg_reload = Local::now().trunc_subsecs(0); let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) { if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) {
let inotify; let mut ctx = ctxclone.write().await;
loop { let inotify = inoarc.read().await;
inotify = match inoarc.try_read() { match ctx.load(&inotify).await {
Ok(o) => o,
Err(e) => {
println!("{e}");
sleep_s(1).await;
continue;
}
};
break;
}
let mut ctxtest = match ctxclone.try_write() {
Ok(o) => o,
Err(e) => {
println!("{e}");
return;
}
};
match ctxtest.load(&inotify).await {
Ok(_) => { Ok(_) => {
*last_cfg_reload = Local::now().trunc_subsecs(0); *last_cfg_reload = Local::now().trunc_subsecs(0);
} }
@ -189,8 +158,8 @@ async fn handle_fwblock(ctxclone: Arc<RwLock<Context>>, ret: &mut Vec<String>, f
// apply firewall blocking // apply firewall blocking
match fwblock(&toblock, ret, fwlen) { match fwblock(&toblock, ret, fwlen) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(err) => {
println!("err: {e}, unable to push firewall rules, use super user") println!("Err: {err}, unable to push firewall rules, use super user")
} }
}; };
} }
@ -232,16 +201,20 @@ async fn compare_files_changes(
let modfiles = inrx.recv().await.unwrap(); let modfiles = inrx.recv().await.unwrap();
let mut iplist: Vec<IpData> = vec![]; let mut iplist: Vec<IpData> = vec![];
let sas = { let sask;
let sas;
{
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
sas = ctx.sas.clone();
sask = sas.keys();
tnets = ctx.cfg.build_trustnets(); tnets = ctx.cfg.build_trustnets();
ctx.sas.clone() }
};
match modfiles.inevent.name { match modfiles.inevent.name {
Some(name) => { Some(name) => {
let filename = name.to_str().unwrap(); let filename = name.to_str().unwrap();
for (sak, sa) in sas.clone().iter_mut() { for sak in sask {
let sa = sas.get(sak).unwrap();
if modfiles.inevent.wd == sa.wd { if modfiles.inevent.wd == sa.wd {
let handle: String; let handle: String;
if sa.filename.as_str() == "" { if sa.filename.as_str() == "" {
@ -252,11 +225,13 @@ async fn compare_files_changes(
continue; continue;
} }
let (filesize, sizechanged) = { let (filesize, sizechanged);
{
let mut ctx = ctxarc.write().await; let mut ctx = ctxarc.write().await;
let sa = ctx.sas.get_mut(sak).unwrap(); let sa = ctx.sas.get_mut(sak).unwrap();
get_last_file_size(&mut sa.watchedfiles, &handle).await (filesize, sizechanged) =
}; get_last_file_size(&mut sa.watchedfiles, &handle).await;
}
if !sizechanged { if !sizechanged {
continue; continue;
@ -279,7 +254,7 @@ async fn compare_files_changes(
} }
} }
for ip in iplist { for ip in iplist {
let ipe = ipevent!("add", "file", gethostname(true), Some(ip)); let ipe = ipevent!("add", "file", gethostname(true), ip);
let ipetx = ipeventtx.read().await; let ipetx = ipeventtx.read().await;
ipetx.send(ipe).await.unwrap(); ipetx.send(ipe).await.unwrap();
} }

View File

@ -4,72 +4,53 @@ use serde_json;
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener; use tokio::net::TcpSocket;
use tokio::sync::RwLock; use tokio::sync::RwLock;
pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> { pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let ctxarc = ctxarc.clone(); let ctxarc = ctxarc.clone();
let addr: String = { ctxarc.read().await.cfg.api.parse().unwrap() }; let addr;
let listener = match TcpListener::bind(addr).await { {
Ok(o) => o, let ctx = ctxarc.read().await;
Err(e) => { addr = ctx.cfg.api.parse().unwrap();
println!("error: {e}"); }
std::process::exit(1);
} let socket = TcpSocket::new_v4().unwrap();
}; socket.bind(addr).unwrap();
socket.set_reuseaddr(true).unwrap();
let listener = socket.listen(1024).unwrap();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
match listener.accept().await { match listener.accept().await {
Ok((mut socket, _addr)) => { Ok((stream, _addr)) => {
let mut buf = vec![0; 1024]; //let mut buf = [0; 1024];
let data;
match socket.readable().await { {
Ok(_) => { let ctx = ctxarc.read().await;
match socket.try_read(&mut buf) { data = serde_json::to_string(&ctx.blocklist);
Ok(_) => {}
Err(e) => {
println!("error: {e}");
continue;
}
};
}
Err(e) => {
println!("error: {e}");
continue;
}
} }
let msg = match String::from_utf8(buf.to_vec()) { match data {
Ok(o) => o.trim_matches(char::from(0)).trim().to_string(), Ok(dt) => {
Err(_) => "".to_string(), let (_reader, mut writer) = stream.into_split();
}; match writer.write_all(format!("{dt}").as_bytes()).await {
Ok(_) => {}
let res = format_result(&ctxarc, msg.as_str()).await; Err(err) => {
println!("{err}");
match socket.write_all(res.as_bytes()).await { }
Ok(_) => {} }
Err(e) => { }
println!("error: {e}"); Err(err) => {
println!("unable to serialize data: {err}");
} }
} }
} }
Err(err) => { Err(err) => {
println!("error: {err}"); println!("couldn't get client: {}", err)
} }
} }
} }
}); });
Ok(()) Ok(())
} }
async fn format_result(ctxarc: &Arc<RwLock<Context>>, mode: &str) -> String {
let data;
let ctx = ctxarc.read().await;
match mode {
"cfg" => data = serde_json::to_string(&ctx.cfg).unwrap(),
"blocklist" => data = serde_json::to_string(&ctx.blocklist).unwrap(),
_ => data = serde_json::to_string(&ctx.blocklist).unwrap(),
};
data
}

View File

@ -1,28 +0,0 @@
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}
pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
// Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
}
pub async fn _sleep_ms(ms: u64) {
sleep(Duration::from_millis(ms)).await;
}

View File

@ -1,14 +1,21 @@
use lazy_static::lazy_static;
use nix::unistd; use nix::unistd;
use regex::Regex;
use std::boxed::Box; use std::boxed::Box;
use std::fs::File; use std::fs::File;
use std::io::*; use std::io::*;
use std::path::Path;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
lazy_static! {
static ref R_FILE_GZIP: Regex = Regex::new(r".*\.gz.*").unwrap();
}
pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> { pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
let mut file = match File::open(filename) { let mut file = match File::open(filename) {
Ok(o) => o, Ok(f) => f,
Err(e) => { Err(err) => {
println!("error: {e}"); println!("{err}");
return None; return None;
} }
}; };
@ -17,13 +24,21 @@ pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
Some(lines) Some(lines)
} }
pub async fn sleep_s(s: u64) { pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
sleep(Duration::from_secs(s)).await; // Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
} }
#[allow(dead_code)] pub async fn _sleep_ms(ms: u64) {
pub async fn sleep_ms(m: u64) { sleep(Duration::from_millis(ms)).await;
sleep(Duration::from_millis(m)).await; }
pub async fn sleep_s(s: u64) {
sleep(Duration::from_secs(s)).await;
} }
pub fn gethostname(show_fqdn: bool) -> String { pub fn gethostname(show_fqdn: bool) -> String {
@ -38,3 +53,19 @@ pub fn gethostname(show_fqdn: bool) -> String {
} }
hostname[0].to_string() hostname[0].to_string()
} }
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}

View File

@ -11,12 +11,12 @@ pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
let mut try_req = 0; let mut try_req = 0;
let client = httpclient(); let client = httpclient();
loop { loop {
match push_ip(&client, &server, &ip.ipdata.clone().unwrap()).await { match push_ip(&client, &server, &ip.ipdata).await {
Ok(_) => { Ok(_) => {
break; break;
} }
Err(e) => { Err(err) => {
println!("error: {e}"); println!("{err}");
sleep_s(1).await; sleep_s(1).await;
if try_req == MAX_FAILED_API_RATE { if try_req == MAX_FAILED_API_RATE {
break; break;

View File

@ -14,13 +14,13 @@ use tungstenite::*;
pub async fn websocketreqrep( pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>, ctxarc: &Arc<RwLock<Context>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> { ) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event, wscfg); let (mut wssocketrr, bootstrap_event, cfg);
{ {
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
bootstrap_event = ctx.cfg.bootstrap_event().clone(); bootstrap_event = ctx.cfg.bootstrap_event().clone();
wscfg = ctx.cfg.ws.get("reqrep").unwrap().clone(); cfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
} }
wssocketrr = websocketconnect(&wscfg, &gethostname(true)).await.unwrap(); wssocketrr = websocketconnect(&cfg, &gethostname(true)).await.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await; send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
return wssocketrr; return wssocketrr;
@ -45,24 +45,15 @@ pub async fn websocketpubsub(
Ok(msg) => { Ok(msg) => {
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) { let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
Ok(o) => o, Ok(o) => o,
Err(e) => { Err(_e) => {
println!("error in pubsub: {e:?}");
continue; continue;
} }
}; };
match tosend.ipdata.clone() { if tosend.ipdata.hostname != gethostname(true)
Some(o) => { || tosend.msgtype == "init".to_string()
if o.hostname != gethostname(true) {
|| tosend.msgtype == "init".to_string() let txps = txpubsub.read().await;
{ txps.send(tosend).await.unwrap();
let txps = txpubsub.read().await;
txps.send(tosend).await.unwrap();
}
}
None => {
let txps = txpubsub.read().await;
txps.send(tosend.clone()).await.unwrap();
}
} }
} }
Err(e) => { Err(e) => {
@ -114,12 +105,11 @@ pub async fn send_to_ipbl_websocket(
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err send read: {e:?}"); println!("err send read: {e:?}");
return false; return handle_websocket_error(ws);
} }
}; };
} else { } else {
println!("can't write to socket"); return handle_websocket_error(ws);
return false;
}; };
if ws.can_read() { if ws.can_read() {
@ -127,13 +117,16 @@ pub async fn send_to_ipbl_websocket(
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err send read: {e:?}"); println!("err send read: {e:?}");
return false; return handle_websocket_error(ws);
} }
}; };
} else { } else {
println!("can't read from socket"); return handle_websocket_error(ws);
return false;
}; };
true true
} }
fn handle_websocket_error(ws: &mut WebSocket<MaybeTlsStream<TcpStream>>) -> bool {
ws.close(None).unwrap();
return false;
}