Compare commits

..

No commits in common. "master" and "1.2.1" have entirely different histories.

15 changed files with 956 additions and 1143 deletions

View File

@ -8,20 +8,13 @@ platform:
arch: amd64 arch: amd64
steps: steps:
- name: build and test - name: test and build
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev - apt-get install -y libnftnl-dev libmnl-dev
- curl -o /usr/bin/sccache https://assets.paulbsd.com/sccache_linux_amd64
- chmod +x /usr/bin/sccache
- cargo build --verbose --all - cargo build --verbose --all
- cargo test --verbose --all - cargo test --verbose --all
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -33,19 +26,12 @@ steps:
- tag - tag
- name: release - name: release
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev - apt-get install -y libnftnl-dev libmnl-dev
- curl -o /usr/bin/sccache https://assets.paulbsd.com/sccache_linux_amd64
- chmod +x /usr/bin/sccache
- cargo build --release --verbose --all - cargo build --release --verbose --all
- cd target/release - cd target/release
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc - tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -87,20 +73,13 @@ platform:
arch: arm64 arch: arm64
steps: steps:
- name: build and test - name: test and build
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev - apt-get install -y libnftnl-dev libmnl-dev
- curl -o /usr/bin/sccache https://assets.paulbsd.com/sccache_linux_arm64
- chmod +x /usr/bin/sccache
- cargo build --verbose --all - cargo build --verbose --all
- cargo test --verbose --all - cargo test --verbose --all
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -112,19 +91,12 @@ steps:
- tag - tag
- name: release - name: release
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev - apt-get install -y libnftnl-dev libmnl-dev
- curl -o /usr/bin/sccache https://assets.paulbsd.com/sccache_linux_arm64
- chmod +x /usr/bin/sccache
- cargo build --release --verbose --all - cargo build --release --verbose --all
- cd target/release - cd target/release
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc - tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry

1054
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package] [package]
name = "ipblc" name = "ipblc"
version = "1.7.0" version = "1.2.1"
edition = "2021" edition = "2021"
authors = ["PaulBSD <paul@paulbsd.com>"] authors = ["PaulBSD <paul@paulbsd.com>"]
description = "ipblc is a tool that search and send attacking ip addresses to ipbl" description = "ipblc is a tool that search and send attacking ip addresses to ipbl"
@ -10,20 +10,19 @@ repository = "https://git.paulbsd.com/paulbsd/ipblc"
[dependencies] [dependencies]
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["string"] } clap = { version = "4.2", features = ["string"] }
git-version = "0.3" git-version = "0.3"
ipnet = "2.9" ipnet = "2.7"
lazy_static = "1.4" lazy_static = "1.4"
mnl = "0.2" mnl = "0.2"
nftnl = "0.6" nftnl = "0.6"
nix = { version = "0.27", features = ["hostname", "inotify"] } nix = "0.26"
regex = "1.10" regex = "1.8"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["json","rustls-tls"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sd-notify = { version = "0.4" } tokio = { version = "1.28", features = ["full", "sync"] }
tokio = { version = "1.35", features = ["full", "sync"] } tungstenite = { version = "0.19", features = ["handshake","rustls-tls-native-roots"] }
tungstenite = { version = "0.21", features = ["handshake", "rustls-tls-native-roots"] }
## to optimize binary size (slow compile time) ## to optimize binary size (slow compile time)
#[profile.release] #[profile.release]

View File

@ -2,5 +2,5 @@ FROM rustembedded/cross:aarch64-unknown-linux-musl
RUN dpkg --add-architecture arm64 RUN dpkg --add-architecture arm64
RUN apt-get update RUN apt-get update
RUN apt-get install -y libnftnl-dev libmnl-dev libmnl0:arm64 libnftnl7:arm64 libmnl0:amd64 libnftnl0:arm64 RUN apt-get install -y libasound2-dev:arm64 libzmq3-dev libnftnl-dev libmnl-dev libmnl0:arm64 libnftnl7:arm64 libmnl0:amd64 libnftnl0:arm64
RUN apt-get clean RUN apt-get clean

View File

@ -4,8 +4,8 @@
## Summary ## Summary
ipblc is client-side intrusion prevention software working closely with ipbl ipblc is a tool that search and send attacking ip addresses to ipbl
It's pub/sub features are websockets based It's notification features are based on zeromq
## Howto ## Howto
@ -48,7 +48,6 @@ Options:
- ✅ Local bound tcp api socket - ✅ Local bound tcp api socket
- ✅ ZMQ -> Websocket - ✅ ZMQ -> Websocket
- ✅ Bug in RwLocks (agent often give up) - ✅ Bug in RwLocks (agent often give up)
- ❌ Create memory friendly structs for ipdata
### Notes ### Notes
@ -58,7 +57,7 @@ See [here](NOTES.md)
## License ## License
```text ```text
Copyright (c) 2022, 2023 PaulBSD Copyright (c) 2021, 2022, 2023 PaulBSD
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without Redistribution and use in source and binary forms, with or without

View File

@ -6,7 +6,7 @@ use chrono::Duration;
use clap::{Arg, ArgAction, ArgMatches, Command}; use clap::{Arg, ArgAction, ArgMatches, Command};
use git_version::git_version; use git_version::git_version;
use ipnet::IpNet; use ipnet::IpNet;
use nix::sys::inotify::{AddWatchFlags, Inotify, WatchDescriptor}; use nix::sys::inotify::{AddWatchFlags, InitFlags, Inotify, WatchDescriptor};
use regex::Regex; use regex::Regex;
use reqwest::{Client, Error as ReqError, Response}; use reqwest::{Client, Error as ReqError, Response};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -17,18 +17,18 @@ use std::path::Path;
pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]); pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]);
const MASTERSERVER: &str = "ipbl.paulbsd.com"; const MASTERSERVER: &str = "ipbl.paulbsd.com";
const WSSUBSCRIPTION: &str = "ipbl"; const WSSUBSCRIPTION: &str = "ipbl";
const CONFIG_RETRY_INTERVAL: u64 = 2; const CONFIG_RETRY: u64 = 1;
const WEB_CLIENT_TIMEOUT: i64 = 5; const WEB_CLIENT_TIMEOUT: i64 = 5;
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct Context { pub struct Context {
pub blocklist: HashMap<String, BlockIpData>, pub blocklist: HashMap<String, BlockIpData>,
pub cfg: Config, pub cfg: Config,
pub discovery: Discovery, pub discovery: Discovery,
pub flags: Flags, pub flags: Flags,
pub instance: Box<Inotify>,
pub sas: HashMap<String, SetMap>, pub sas: HashMap<String, SetMap>,
pub hashwd: HashMap<String, WatchDescriptor>, pub hashwd: HashMap<String, WatchDescriptor>,
pub reloadinterval: isize,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -36,7 +36,7 @@ pub struct SetMap {
pub filename: String, pub filename: String,
pub fullpath: String, pub fullpath: String,
pub regex: Regex, pub regex: Regex,
pub set: SetCfg, pub set: Set,
pub watchedfiles: HashMap<String, u64>, pub watchedfiles: HashMap<String, u64>,
pub wd: WatchDescriptor, pub wd: WatchDescriptor,
} }
@ -48,7 +48,7 @@ pub struct Flags {
} }
impl Context { impl Context {
pub async fn new(inotify: &Inotify) -> Self { pub async fn new() -> Self {
// Get flags // Get flags
let argp: ArgMatches = Context::argparse(); let argp: ArgMatches = Context::argparse();
let debug: bool = argp.get_one::<bool>("debug").unwrap().to_owned(); let debug: bool = argp.get_one::<bool>("debug").unwrap().to_owned();
@ -63,13 +63,13 @@ impl Context {
urls: HashMap::new(), urls: HashMap::new(),
}, },
sas: HashMap::new(), sas: HashMap::new(),
instance: Box::new(Inotify::init(InitFlags::empty()).unwrap()),
blocklist: HashMap::new(), blocklist: HashMap::new(),
hashwd: HashMap::new(), hashwd: HashMap::new(),
reloadinterval: 5,
}; };
print!("Loading config ... "); print!("Loading config ... ");
ctx.load(&inotify).await.unwrap(); ctx.load().await.unwrap();
ctx ctx
} }
@ -103,17 +103,17 @@ impl Context {
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: Discovery = match req.json().await { let data: Discovery = match req.json().await {
Ok(o) => o, Ok(res) => res,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
Ok(data) Ok(data)
} }
pub async fn load(&mut self, inotify: &Inotify) -> Result<(), Box<dyn std::error::Error>> { pub async fn load(&mut self) -> Result<(), Box<dyn std::error::Error>> {
if cfg!(test) { if cfg!(test) {
return Ok(()); return Ok(());
} }
@ -127,17 +127,17 @@ impl Context {
} }
break; break;
} }
Err(e) => { Err(err) => {
println!("error loading config: {e}, retrying in {CONFIG_RETRY_INTERVAL}s"); println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs");
last_in_err = true; last_in_err = true;
sleep_s(CONFIG_RETRY_INTERVAL).await; sleep_s(CONFIG_RETRY).await;
} }
}; };
} }
if last_in_err { if last_in_err {
println!("creating sas"); println!("creating sas");
} }
self.create_sas(&inotify).await?; self.create_sas().await?;
if last_in_err { if last_in_err {
println!("created sas"); println!("created sas");
} }
@ -153,9 +153,9 @@ impl Context {
res res
} }
pub async fn get_blocklist_toblock(&self) -> Vec<IpData> { pub async fn get_blocklist_toblock(&mut self) -> Vec<IpData> {
let mut res: Vec<IpData> = vec![]; let mut res: Vec<IpData> = vec![];
for (_, block) in self.blocklist.iter() { for (_, block) in self.blocklist.iter_mut() {
match self.cfg.sets.get(&block.ipdata.src) { match self.cfg.sets.get(&block.ipdata.src) {
Some(set) => { Some(set) => {
if block.tryfail >= set.tryfail { if block.tryfail >= set.tryfail {
@ -169,41 +169,38 @@ impl Context {
} }
pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> { pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> {
match &ipevent.ipdata { match self.cfg.sets.get(&ipevent.ipdata.src) {
Some(ipdata) => match self.cfg.sets.get(&ipdata.src) { Some(set) => {
Some(set) => { let starttime = DateTime::parse_from_rfc3339(ipevent.ipdata.date.as_str())
let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str()) .unwrap()
.unwrap() .with_timezone(&chrono::Local);
.with_timezone(&chrono::Local); let blocktime = set.blocktime;
let blocktime = set.blocktime; if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname { let block = self
let block = .blocklist
self.blocklist .entry(ipevent.ipdata.ip.to_string())
.entry(ipdata.ip.to_string()) .or_insert(BlockIpData {
.or_insert(BlockIpData { ipdata: ipevent.ipdata.clone(),
ipdata: ipdata.clone(), tryfail: 0,
tryfail: 0, starttime,
starttime, blocktime,
blocktime, });
}); block.tryfail += 1;
block.tryfail += 1; block.blocktime = blocktime;
block.blocktime = blocktime; if block.tryfail >= set.tryfail {
if block.tryfail >= set.tryfail { return Some(ipevent.clone());
return Some(ipevent.clone());
}
} else {
self.blocklist
.entry(ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipdata.clone(),
tryfail: set.tryfail,
starttime,
blocktime,
});
} }
} else {
self.blocklist
.entry(ipevent.ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipevent.ipdata.clone(),
tryfail: set.tryfail,
starttime,
blocktime,
});
} }
None => {} }
},
None => {} None => {}
} }
None None
@ -231,17 +228,15 @@ impl Context {
removed removed
} }
pub async fn create_sas( pub async fn create_sas(&mut self) -> Result<(), Box<dyn std::error::Error>> {
&mut self,
inotify: &Inotify,
) -> Result<(), Box<dyn std::error::Error>> {
for (src, set) in self.cfg.sets.iter() { for (src, set) in self.cfg.sets.iter() {
let p = Path::new(set.path.as_str()); let p = Path::new(set.path.as_str());
if p.is_dir() { if p.is_dir() {
let wd = match self.hashwd.get(&set.path.to_string()) { let wd = match self.hashwd.get(&set.path.to_string()) {
Some(wd) => *wd, Some(wd) => *wd,
None => { None => {
let res = inotify let res = self
.instance
.add_watch(set.path.as_str(), AddWatchFlags::IN_MODIFY) .add_watch(set.path.as_str(), AddWatchFlags::IN_MODIFY)
.unwrap(); .unwrap();
self.hashwd.insert(set.path.to_string(), res); self.hashwd.insert(set.path.to_string(), res);
@ -287,7 +282,7 @@ impl Context {
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Config { pub struct Config {
pub sets: HashMap<String, SetCfg>, pub sets: HashMap<String, Set>,
#[serde(skip_serializing)] #[serde(skip_serializing)]
pub trustnets: Vec<String>, pub trustnets: Vec<String>,
pub ws: HashMap<String, WebSocketCfg>, pub ws: HashMap<String, WebSocketCfg>,
@ -299,7 +294,7 @@ impl Config {
Self { Self {
sets: HashMap::from([ sets: HashMap::from([
("smtp".to_string(), ("smtp".to_string(),
SetCfg { Set {
src: "smtp".to_string(), src: "smtp".to_string(),
filename: "mail.log".to_string(), filename: "mail.log".to_string(),
regex: "(SASL LOGIN authentication failed)".to_string(), regex: "(SASL LOGIN authentication failed)".to_string(),
@ -308,7 +303,7 @@ impl Config {
tryfail: 5, tryfail: 5,
}), }),
("ssh".to_string(), ("ssh".to_string(),
SetCfg { Set {
src: "ssh".to_string(), src: "ssh".to_string(),
filename: "auth.log".to_string(), filename: "auth.log".to_string(),
regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(), regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(),
@ -317,7 +312,7 @@ impl Config {
tryfail: 5, tryfail: 5,
},), },),
("http".to_string(), ("http".to_string(),
SetCfg { Set {
src: "http".to_string(), src: "http".to_string(),
filename: "".to_string(), filename: "".to_string(),
regex: "(anonymousfox.co)".to_string(), regex: "(anonymousfox.co)".to_string(),
@ -326,7 +321,7 @@ impl Config {
tryfail: 5, tryfail: 5,
},), },),
("openvpn".to_string(), ("openvpn".to_string(),
SetCfg { Set {
src: "openvpn".to_string(), src: "openvpn".to_string(),
filename: "status".to_string(), filename: "status".to_string(),
regex: "(UNDEF)".to_string(), regex: "(UNDEF)".to_string(),
@ -355,61 +350,97 @@ impl Config {
} }
pub async fn load(&mut self, server: &String) -> Result<(), ReqError> { pub async fn load(&mut self, server: &String) -> Result<(), ReqError> {
self.get_config(server).await?; self.get_global_config(server).await?;
self.get_trustnets(server).await?;
self.get_sets(server).await?;
self.get_ws_config(server).await?;
Ok(()) Ok(())
} }
async fn get_config(&mut self, server: &String) -> Result<(), ReqError> { async fn get_global_config(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> =
httpclient().get(format!("{server}/config")).send().await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: HashMap<String, GlobalConfig> = match req.json::<Vec<GlobalConfig>>().await {
Ok(res) => {
let mut out: HashMap<String, GlobalConfig> = HashMap::new();
res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.key.to_string(), x);
});
out
}
Err(err) => return Err(err),
};
let key = "".to_string();
self.api = data
.get(&key.to_string())
.unwrap_or(&GlobalConfig {
key: "api".to_string(),
value: "127.0.0.1:8060".to_string(),
})
.value
.clone();
Ok(())
}
async fn get_trustnets(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = httpclient() let resp: Result<Response, ReqError> = httpclient()
.get(format!("{server}/config?v=2")) .get(format!("{server}/config/trustlist"))
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: GlobalConfigV2 = match req.json::<GlobalConfigV2>().await { let data: Vec<String> = match req.json::<Vec<String>>().await {
Ok(o) => o, Ok(res) => res,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
self.trustnets = data;
Ok(())
}
for d in data.sets { async fn get_sets(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = httpclient()
.get(format!("{server}/config/sets"))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<Set> = match req.json::<Vec<Set>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
for d in data {
self.sets.insert(d.src.clone(), d); self.sets.insert(d.src.clone(), d);
} }
self.trustnets = data.trustlists;
data.ws.into_iter().map(|x| x).for_each(|x| {
self.ws.insert(x.t.to_string(), x);
});
self.api = data
.cfg
.get(&"api".to_string())
.unwrap_or(&self.api)
.clone();
Ok(()) Ok(())
} }
pub async fn _get_last(server: &String) -> Result<Vec<IpData>, ReqError> { async fn get_ws_config(&mut self, server: &String) -> Result<(), ReqError> {
let resp = httpclient() let resp: Result<Response, ReqError> =
.get(format!("{server}/ips/last")) httpclient().get(format!("{server}/config/ws")).send().await;
.query(&[("interval", "3 hours")])
.send()
.await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: HashMap<String, WebSocketCfg> = match req.json::<Vec<WebSocketCfg>>().await {
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await { Ok(res) => {
Ok(o) => o, let mut out: HashMap<String, WebSocketCfg> = HashMap::new();
Err(e) => return Err(e), res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.t.to_string(), x);
});
out
}
Err(err) => return Err(err),
}; };
self.ws = data;
Ok(data) Ok(())
} }
pub fn build_trustnets(&self) -> Vec<IpNet> { pub fn build_trustnets(&self) -> Vec<IpNet> {
@ -417,8 +448,8 @@ impl Config {
for trustnet in &self.trustnets { for trustnet in &self.trustnets {
match trustnet.parse() { match trustnet.parse() {
Ok(net) => trustnets.push(net), Ok(net) => trustnets.push(net),
Err(e) => { Err(err) => {
println!("error parsing {trustnet}, error: {e}"); println!("error parsing {trustnet}, error: {err}");
} }
}; };
} }
@ -430,7 +461,12 @@ impl Config {
msgtype: String::from("bootstrap"), msgtype: String::from("bootstrap"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: gethostname(true), hostname: gethostname(true),
ipdata: None, ipdata: IpData {
ip: "".to_string(),
src: "".to_string(),
date: "".to_string(),
hostname: "".to_string(),
},
} }
} }
} }
@ -451,15 +487,13 @@ pub fn httpclient() -> Client {
} }
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct GlobalConfigV2 { pub struct GlobalConfig {
pub cfg: HashMap<String, String>, pub key: String,
pub sets: Vec<SetCfg>, pub value: String,
pub trustlists: Vec<String>,
pub ws: Vec<WebSocketCfg>,
} }
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct SetCfg { pub struct Set {
pub src: String, pub src: String,
pub filename: String, pub filename: String,
pub regex: String, pub regex: String,
@ -488,13 +522,13 @@ pub struct URL {
pub path: String, pub path: String,
} }
impl PartialEq for SetCfg { impl PartialEq for Set {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.src == other.src self.src == other.src
} }
} }
impl Hash for SetCfg { impl Hash for Set {
fn hash<H: Hasher>(&self, state: &mut H) { fn hash<H: Hasher>(&self, state: &mut H) {
self.src.hash(state); self.src.hash(state);
} }
@ -504,12 +538,10 @@ impl Hash for SetCfg {
mod test { mod test {
use super::*; use super::*;
use crate::ip::*; use crate::ip::*;
use nix::sys::inotify::InitFlags;
use Context; use Context;
pub async fn prepare_test_data() -> Context { pub async fn prepare_test_data() -> Context {
let inotify = Inotify::init(InitFlags::empty()).unwrap(); let mut ctx = Context::new().await;
let mut ctx = Context::new(&inotify).await;
let now: DateTime<Local> = Local::now().trunc_subsecs(0); let now: DateTime<Local> = Local::now().trunc_subsecs(0);
ctx.blocklist = HashMap::new(); ctx.blocklist = HashMap::new();
@ -518,13 +550,12 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4,
ip: "1.1.1.1".to_string(), ip: "1.1.1.1".to_string(),
hostname: "test1".to_string(), hostname: "test1".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "ssh".to_string(), src: "ssh".to_string(),
}), },
}) })
.await; .await;
} }
@ -534,13 +565,12 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4,
ip: "1.1.1.2".to_string(), ip: "1.1.1.2".to_string(),
hostname: "test2".to_string(), hostname: "test2".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
} }
@ -549,13 +579,12 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4,
ip: "1.1.1.3".to_string(), ip: "1.1.1.3".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
@ -563,13 +592,12 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4,
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
@ -577,33 +605,19 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4,
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
})
.await;
ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 6,
ip: "2a00:1450:4007:805::2003".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
}) })
.await; .await;
let ip1 = ctx.blocklist.get_mut(&"1.1.1.1".to_string()).unwrap(); let mut ip1 = ctx.blocklist.get_mut(&"1.1.1.1".to_string()).unwrap();
ip1.starttime = DateTime::from(now) - Duration::minutes(61); ip1.starttime = DateTime::from(now) - Duration::minutes(61);
let ip2 = ctx.blocklist.get_mut(&"1.1.1.2".to_string()).unwrap(); let mut ip2 = ctx.blocklist.get_mut(&"1.1.1.2".to_string()).unwrap();
ip2.starttime = DateTime::from(now) - Duration::minutes(62); ip2.starttime = DateTime::from(now) - Duration::minutes(62);
ctx ctx
} }
@ -613,14 +627,8 @@ mod test {
let ctx = prepare_test_data().await; let ctx = prepare_test_data().await;
let pending = ctx.get_blocklist_pending().await; let pending = ctx.get_blocklist_pending().await;
assert_eq!(pending.len(), 5); assert_eq!(pending.len(), 4);
let ips = [ let ips = ["1.1.1.1", "1.1.1.2", "1.1.1.3", "1.1.1.4"];
"1.1.1.1",
"1.1.1.2",
"1.1.1.3",
"1.1.1.4",
"2a00:1450:4007:805::2003",
];
for i in ips { for i in ips {
let ip = ctx let ip = ctx
.blocklist .blocklist
@ -638,7 +646,7 @@ mod test {
let mut ctx = prepare_test_data().await; let mut ctx = prepare_test_data().await;
ctx.gc_blocklist().await; ctx.gc_blocklist().await;
let toblock = ctx.get_blocklist_toblock().await; let toblock = ctx.get_blocklist_toblock().await;
assert_eq!(toblock.len(), 3); assert_eq!(toblock.len(), 2);
} }
#[tokio::test] #[tokio::test]

159
src/fw.rs
View File

@ -2,79 +2,13 @@ use crate::ip::IpData;
use crate::ipblc::PKG_NAME; use crate::ipblc::PKG_NAME;
use nftnl::{nft_expr, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table}; use nftnl::{nft_expr, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table};
use std::{ use std::{ffi::CString, io::Error, net::Ipv4Addr};
ffi::CString,
io::Error,
net::{Ipv4Addr, Ipv6Addr},
};
pub enum FwTableType { pub fn fwinit() -> (Batch, Table) {
IPv4,
IPv6,
}
pub fn fwglobalinit<'a>() -> ((Batch, Table), (Batch, Table)) {
let (batch4, table4) = fwinit(FwTableType::IPv4);
let (batch6, table6) = fwinit(FwTableType::IPv6);
((batch4, table4), (batch6, table6))
}
macro_rules! initrules {
($batch:expr, $table:expr, $chain:ident) => {
$chain.set_hook(nftnl::Hook::In, 1);
$chain.set_policy(nftnl::Policy::Accept);
$batch.add(&$chain, nftnl::MsgType::Add);
$batch.add(&Rule::new(&$chain), nftnl::MsgType::Del);
let mut rule = Rule::new(&$chain);
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict accept));
$batch.add(&rule, nftnl::MsgType::Add);
};
}
macro_rules! createrules {
($ipdata:ident, $chain:ident, $batch:ident, $t:ty, $ip_t:ident) => {
let mut rule = Rule::new(&$chain);
let ip = $ipdata.ip.parse::<$t>().unwrap();
rule.add_expr(&nft_expr!(payload $ip_t saddr));
rule.add_expr(&nft_expr!(cmp == ip));
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 10u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict drop));
$batch.add(&rule, nftnl::MsgType::Add);
}
}
fn fwinit(t: FwTableType) -> (Batch, Table) {
let table_name: String;
let table: Table;
match t {
FwTableType::IPv4 => {
table_name = format!("{PKG_NAME}4");
table = Table::new(
&CString::new(format!("{table_name}")).unwrap(),
ProtoFamily::Ipv4,
);
}
FwTableType::IPv6 => {
table_name = format!("{PKG_NAME}6");
table = Table::new(
&CString::new(format!("{table_name}")).unwrap(),
ProtoFamily::Ipv6,
);
}
}
let mut batch = Batch::new(); let mut batch = Batch::new();
let table = Table::new(&CString::new(PKG_NAME).unwrap(), ProtoFamily::Ipv4);
batch.add(&table, nftnl::MsgType::Add); batch.add(&table, nftnl::MsgType::Add);
batch.add(&table, nftnl::MsgType::Del); batch.add(&table, nftnl::MsgType::Del);
@ -83,62 +17,53 @@ fn fwinit(t: FwTableType) -> (Batch, Table) {
} }
pub fn fwblock( pub fn fwblock(
ips_add_all: &Vec<IpData>, ips_add: &Vec<IpData>,
ret: &mut Vec<String>, ret: &mut Vec<String>,
fwlen: &mut usize, fwlen: &mut usize,
) -> std::result::Result<(), Error> { ) -> std::result::Result<(), Error> {
let ((mut batch4, table4), (mut batch6, table6)) = fwglobalinit(); // convert chain
let ips_add = convert(ips_add);
let (mut batch, table) = fwinit();
let mut chain4 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table4); // build chain
let mut chain6 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table6); let mut chain = Chain::new(&CString::new(PKG_NAME).unwrap(), &table);
chain.set_hook(nftnl::Hook::In, 1);
chain.set_policy(nftnl::Policy::Accept);
initrules!(batch4, table4, chain4); // add chain
initrules!(batch6, table6, chain6); batch.add(&chain, nftnl::MsgType::Add);
let mut factor = 1; let rule = Rule::new(&chain);
if ips_add_all.len() > 100 { batch.add(&rule, nftnl::MsgType::Del);
factor = (ips_add_all.len() / 10) as usize
}
let ips_add_tmp: Vec<IpData> = ips_add_all.clone().iter().map(|x| x.clone()).collect(); let mut rule = Rule::new(&chain);
let mut ips_add_iter = ips_add_tmp.chunks(factor); rule.add_expr(&nft_expr!(ct state));
let mut ips_add: Vec<&[IpData]> = vec![]; rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
while let Some(x) = ips_add_iter.next() { rule.add_expr(&nft_expr!(cmp != 0u32));
ips_add.push(x); rule.add_expr(&nft_expr!(counter));
} rule.add_expr(&nft_expr!(verdict accept));
batch.add(&rule, nftnl::MsgType::Add);
// build and add rules // build and add rules
for ipdata_group in ips_add.clone() { for ip in ips_add.clone() {
for ipdata in ipdata_group { let mut rule = Rule::new(&chain);
match ipdata.t { rule.add_expr(&nft_expr!(payload ipv4 saddr));
4 => { rule.add_expr(&nft_expr!(cmp == ip));
createrules!(ipdata, chain4, batch4, Ipv4Addr, ipv4); rule.add_expr(&nft_expr!(ct state));
} rule.add_expr(&nft_expr!(bitwise mask 10u32, xor 0u32));
6 => { rule.add_expr(&nft_expr!(cmp != 0u32));
createrules!(ipdata, chain6, batch6, Ipv6Addr, ipv6); rule.add_expr(&nft_expr!(counter));
} rule.add_expr(&nft_expr!(verdict drop));
_ => {} batch.add(&rule, nftnl::MsgType::Add);
}
}
} }
// validate and send batch // validate and send batch
for b in [batch4, batch6] { let finalized_batch = batch.finalize();
let bf = b.finalize(); send_and_process(&finalized_batch)?;
match send_and_process(&bf) { if fwlen != &mut ips_add.len() {
Ok(_) => {} ret.push(format!("{length} ip in firewall", length = ips_add.len()));
Err(e) => {
println!("error sending batch: {e}");
}
};
} }
if fwlen != &mut ips_add_all.len() { *fwlen = ips_add.len();
ret.push(format!(
"{length} ip in firewall",
length = ips_add_all.len()
));
}
*fwlen = ips_add_all.len();
Ok(()) Ok(())
} }
@ -169,3 +94,11 @@ fn socket_recv<'a>(
Ok(None) Ok(None)
} }
} }
fn convert(input: &Vec<IpData>) -> Vec<Ipv4Addr> {
let mut output: Vec<Ipv4Addr> = vec![];
for val in input {
output.push(val.ip.parse::<Ipv4Addr>().unwrap());
}
output
}

134
src/ip.rs
View File

@ -1,10 +1,11 @@
use crate::config::httpclient;
use crate::utils::gethostname; use crate::utils::gethostname;
use chrono::offset::LocalResult;
use chrono::prelude::*; use chrono::prelude::*;
use ipnet::IpNet; use ipnet::IpNet;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::Regex; use regex::Regex;
use reqwest::Error as ReqError;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::cmp::Ordering; use std::cmp::Ordering;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
@ -22,27 +23,15 @@ pub struct IpEvent {
pub msgtype: String, pub msgtype: String,
pub mode: String, pub mode: String,
pub hostname: String, pub hostname: String,
pub ipdata: Option<IpData>, pub ipdata: IpData,
} }
#[macro_export] #[derive(Clone, Debug, Serialize, Deserialize, Eq)]
macro_rules! ipevent { pub struct IpData {
($msgtype:expr,$mode:expr,$hostname:expr,$ipdata:expr) => { pub ip: String,
IpEvent { pub src: String,
msgtype: String::from($msgtype), pub date: String,
mode: String::from($mode), pub hostname: String,
hostname: $hostname,
ipdata: $ipdata,
}
};
($msgtype:expr,$mode:expr,$hostname:expr) => {
IpEvent {
msgtype: String::from($msgtype),
mode: String::from($mode),
hostname: $hostname,
ipdata: None,
}
};
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -53,28 +42,6 @@ pub struct BlockIpData {
pub starttime: DateTime<Local>, pub starttime: DateTime<Local>,
} }
#[derive(Clone, Debug, Serialize, Deserialize, Eq)]
pub struct IpData {
pub t: isize,
pub ip: String,
pub src: String,
pub date: String,
pub hostname: String,
}
#[macro_export]
macro_rules! ipdata {
($t:expr,$ip:expr,$src:expr,$date:expr,$hostname:expr) => {
IpData {
t: $t.clone(),
ip: $ip.clone(),
src: $src.clone(),
date: $date.clone(),
hostname: $hostname.clone(),
}
};
}
impl PartialEq for IpData { impl PartialEq for IpData {
fn eq(&self, other: &IpData) -> bool { fn eq(&self, other: &IpData) -> bool {
self.ip.as_bytes() == other.ip.as_bytes() && self.src == other.src self.ip.as_bytes() == other.ip.as_bytes() && self.src == other.src
@ -110,7 +77,7 @@ impl Display for IpData {
} }
pub fn filter( pub fn filter(
reader: Box<dyn Read>, lines: Box<dyn Read>,
iplist: &mut Vec<IpData>, iplist: &mut Vec<IpData>,
trustnets: &Vec<IpNet>, trustnets: &Vec<IpNet>,
regex: &Regex, regex: &Regex,
@ -119,23 +86,19 @@ pub fn filter(
) -> isize { ) -> isize {
let mut ips = 0; let mut ips = 0;
let hostname = gethostname(true); let hostname = gethostname(true);
let lines = BufReader::new(reader).lines(); for line in BufReader::new(lines).lines() {
for line in lines.into_iter() {
if let Ok(l) = line { if let Ok(l) = line {
if regex.is_match(l.as_str()) { if regex.is_match(l.as_str()) {
let s_ipaddr: String; let s_ipaddr: String;
let t: isize;
match R_IPV4.captures(l.as_str()) { match R_IPV4.captures(l.as_str()) {
Some(sv4) => { Some(sv4) => {
s_ipaddr = sv4.get(0).unwrap().as_str().to_string(); s_ipaddr = sv4.get(0).unwrap().as_str().to_string();
t = 4;
} }
None => { None => {
match R_IPV6.captures(l.as_str()) { match R_IPV6.captures(l.as_str()) {
Some(sv6) => { Some(sv6) => {
s_ipaddr = sv6.get(0).unwrap().as_str().to_string(); s_ipaddr = sv6.get(0).unwrap().as_str().to_string();
t = 6;
} }
None => { None => {
continue; continue;
@ -144,14 +107,6 @@ pub fn filter(
} }
}; };
let ipaddr: IpAddr = match s_ipaddr.parse() {
Ok(ip) => ip,
Err(err) => {
println!("unparseable IP: {err} {s_ipaddr}");
continue;
}
};
let s_date: DateTime<Local>; let s_date: DateTime<Local>;
match R_DATE.captures(l.as_str()) { match R_DATE.captures(l.as_str()) {
Some(sdt) => { Some(sdt) => {
@ -165,8 +120,21 @@ pub fn filter(
} }
}; };
let ipaddr: IpAddr = match s_ipaddr.parse() {
Ok(ip) => ip,
Err(err) => {
println!("unparseable IP: {err} {s_ipaddr}");
continue;
}
};
if !is_trusted(&ipaddr, &trustnets) { if !is_trusted(&ipaddr, &trustnets) {
iplist.push(ipdata!(t, s_ipaddr, src, s_date.to_rfc3339(), hostname)); iplist.push(IpData {
ip: s_ipaddr,
src: src.to_owned(),
date: s_date.to_rfc3339().to_owned(),
hostname: hostname.to_owned(),
});
ips += 1; ips += 1;
}; };
} }
@ -176,24 +144,28 @@ pub fn filter(
} }
fn parse_date(input: regex::Captures) -> DateTime<Local> { fn parse_date(input: regex::Captures) -> DateTime<Local> {
let mut ymd: Vec<u32> = vec![]; let mut ymd: Vec<u64> = vec![];
let mut hms: Vec<u32> = vec![]; let mut hms: Vec<u64> = vec![];
let ymd_range = 2..5;
let hms_range = 5..8;
for cap in ymd_range { let (daterange, hourrange) = (2..5, 5..8);
ymd.push(input.get(cap).unwrap().as_str().parse::<u32>().unwrap());
for i in daterange {
ymd.push(input.get(i).unwrap().as_str().parse::<u64>().unwrap());
} }
for cap in hms_range { for i in hourrange {
hms.push(input.get(cap).unwrap().as_str().parse::<u32>().unwrap()); hms.push(input.get(i).unwrap().as_str().parse::<u64>().unwrap());
} }
let date: DateTime<Local> = let date = Local
match Local.with_ymd_and_hms(ymd[0] as i32, ymd[1], ymd[2], hms[0], hms[1], hms[2]) { .with_ymd_and_hms(
LocalResult::Single(s) => s, ymd[0] as i32,
LocalResult::Ambiguous(a, _b) => a, ymd[1] as u32,
LocalResult::None => Local::now().trunc_subsecs(0), ymd[2] as u32,
}; hms[0] as u32,
hms[1] as u32,
hms[2] as u32,
)
.unwrap();
date date
} }
@ -205,3 +177,23 @@ fn is_trusted(ip: &IpAddr, trustnets: &Vec<IpNet>) -> bool {
} }
false false
} }
pub async fn _get_last(server: &String) -> Result<Vec<IpData>, ReqError> {
let resp = httpclient()
.get(format!("{server}/ips/last"))
.query(&[("interval", "3 hours")])
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
Ok(data)
}

View File

@ -1,7 +1,6 @@
use crate::config::{Context, GIT_VERSION}; use crate::config::{Context, GIT_VERSION};
use crate::fw::{fwblock, fwglobalinit}; use crate::fw::{fwblock, fwinit};
use crate::ip::{filter, IpData, IpEvent}; use crate::ip::{filter, IpData, IpEvent};
use crate::ipevent;
use crate::monitoring::apiserver; use crate::monitoring::apiserver;
use crate::utils::{gethostname, read_lines, sleep_s}; use crate::utils::{gethostname, read_lines, sleep_s};
use crate::webservice::send_to_ipbl_api; use crate::webservice::send_to_ipbl_api;
@ -10,8 +9,7 @@ use crate::websocket::{send_to_ipbl_websocket, websocketpubsub, websocketreqrep}
use chrono::prelude::*; use chrono::prelude::*;
use chrono::prelude::{DateTime, Local}; use chrono::prelude::{DateTime, Local};
use chrono::Duration; use chrono::Duration;
use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent}; use nix::sys::inotify::InotifyEvent;
use sd_notify::*;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
@ -22,25 +20,16 @@ const BL_CHAN_SIZE: usize = 32;
const WS_CHAN_SIZE: usize = 64; const WS_CHAN_SIZE: usize = 64;
const LOOP_MAX_WAIT: u64 = 5; const LOOP_MAX_WAIT: u64 = 5;
macro_rules! log_with_systemd {
($msg:expr) => {
println!("{}", $msg);
notify(false, &[NotifyState::Status(format!("{}", $msg).as_str())]).unwrap();
};
}
pub async fn run() { pub async fn run() {
let inotify = Inotify::init(InitFlags::empty()).unwrap(); let globalctx = Context::new().await;
let globalctx = Context::new(&inotify).await;
let ctxarc = Arc::new(RwLock::new(globalctx)); let ctxarc = Arc::new(RwLock::new(globalctx));
let mut fwlen: usize = 0; let mut fwlen: usize = 0;
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0); let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
log_with_systemd!(format!("Launching {}, version {}", PKG_NAME, pkgversion)); println!("Launching {}, version {}", PKG_NAME, pkgversion);
fwglobalinit(); fwinit();
let ctxapi = Arc::clone(&ctxarc); let ctxapi = Arc::clone(&ctxarc);
apiserver(&ctxapi).await.unwrap(); apiserver(&ctxapi).await.unwrap();
@ -58,9 +47,7 @@ pub async fn run() {
let mut wssocketrr = websocketreqrep(&ctxwsrr).await; let mut wssocketrr = websocketreqrep(&ctxwsrr).await;
// init file watcher // init file watcher
let inoarc = Arc::new(RwLock::new(inotify)); let mut blrx = watchfiles(&ctxarc).await;
let inoclone = Arc::clone(&inoarc);
let mut blrx = watchfiles(inoclone).await;
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
let ipeventclone = Arc::clone(&ipeventtxarc); let ipeventclone = Arc::clone(&ipeventtxarc);
@ -68,8 +55,6 @@ pub async fn run() {
compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await; compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await;
}); });
notify(false, &[NotifyState::Ready]).unwrap();
loop { loop {
let mut ret: Vec<String> = Vec::new(); let mut ret: Vec<String> = Vec::new();
@ -79,16 +64,22 @@ pub async fn run() {
ipevent = ipeventrx.recv() => { ipevent = ipeventrx.recv() => {
let received_ip = ipevent.unwrap(); let received_ip = ipevent.unwrap();
let (toblock,server) = { let (toblock,server);
let ctx = ctxclone.read().await; {
(ctx.get_blocklist_toblock().await,ctx.flags.server.clone()) let mut ctx = ctxclone.write().await;
}; toblock = ctx.get_blocklist_toblock().await;
server = ctx.flags.server.clone();
}
if received_ip.msgtype == "bootstrap".to_string() { if received_ip.msgtype == "bootstrap".to_string() {
for ip_to_send in toblock { for ip_to_send in toblock {
let ipe = ipevent!("init","ws",gethostname(true),Some(ip_to_send)); let ipe = IpEvent{
msgtype: String::from("init"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: ip_to_send,
};
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await; wssocketrr = websocketreqrep(&ctxwsrr).await;
break; break;
} }
@ -97,31 +88,32 @@ pub async fn run() {
} }
// refresh context blocklist // refresh context blocklist
let filtered_ipevent = { let filtered_ipevent;
ctxarc.write().await.update_blocklist(&received_ip).await {
}; let mut ctx = ctxarc.write().await;
filtered_ipevent = ctx.update_blocklist(&received_ip).await;
}
// send ip list to api and ws sockets // send ip list to api and ws sockets
if let Some(ipevent) = filtered_ipevent { if let Some(ipevent) = filtered_ipevent {
if received_ip.msgtype != "init" { if received_ip.msgtype != "init" {
log_with_systemd!(format!("sending {} to api and ws", ipevent.ipdata.clone().unwrap().ip)); println!("sending {} to api and ws", ipevent.ipdata.ip);
let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata); let ipe = IpEvent{
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: ipevent.ipdata,
};
send_to_ipbl_api(&server.clone(), &ipe).await; send_to_ipbl_api(&server.clone(), &ipe).await;
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
wssocketrr.close(None).unwrap(); if !status {
wssocketrr = websocketreqrep(&ctxwsrr).await; wssocketrr = websocketreqrep(&ctxwsrr).await;
continue; continue;
} }
} }
} }
} }
_val = sleep_s(LOOP_MAX_WAIT) => { _val = sleep_s(LOOP_MAX_WAIT) => {}
let ipe = ipevent!("ping", "ws", gethostname(true));
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
}
}
}; };
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
@ -129,43 +121,19 @@ pub async fn run() {
// log lines // log lines
if ret.len() > 0 { if ret.len() > 0 {
let result = ret.join(", "); println!("{ret}", ret = ret.join(", "));
log_with_systemd!(format!("{result}"));
} }
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
let inoclone = Arc::clone(&inoarc); handle_cfg_reload(&ctxclone, &mut last_cfg_reload).await;
handle_cfg_reload(&ctxclone, &mut last_cfg_reload, inoclone).await;
} }
} }
async fn handle_cfg_reload( async fn handle_cfg_reload(ctxclone: &Arc<RwLock<Context>>, last_cfg_reload: &mut DateTime<Local>) {
ctxclone: &Arc<RwLock<Context>>,
last_cfg_reload: &mut DateTime<Local>,
inoarc: Arc<RwLock<Inotify>>,
) {
let now_cfg_reload = Local::now().trunc_subsecs(0); let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) { if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) {
let inotify; let mut ctx = ctxclone.write().await;
loop { match ctx.load().await {
inotify = match inoarc.try_read() {
Ok(o) => o,
Err(e) => {
println!("{e}");
sleep_s(1).await;
continue;
}
};
break;
}
let mut ctxtest = match ctxclone.try_write() {
Ok(o) => o,
Err(e) => {
println!("{e}");
return;
}
};
match ctxtest.load(&inotify).await {
Ok(_) => { Ok(_) => {
*last_cfg_reload = Local::now().trunc_subsecs(0); *last_cfg_reload = Local::now().trunc_subsecs(0);
} }
@ -177,29 +145,34 @@ async fn handle_cfg_reload(
} }
async fn handle_fwblock(ctxclone: Arc<RwLock<Context>>, ret: &mut Vec<String>, fwlen: &mut usize) { async fn handle_fwblock(ctxclone: Arc<RwLock<Context>>, ret: &mut Vec<String>, fwlen: &mut usize) {
{ let toblock = {
let mut ctx = ctxclone.write().await; let mut ctx = ctxclone.write().await;
ctx.gc_blocklist().await; ctx.gc_blocklist().await;
}
let toblock = {
let ctx = ctxclone.read().await;
ctx.get_blocklist_toblock().await ctx.get_blocklist_toblock().await
}; };
// apply firewall blocking // apply firewall blocking
match fwblock(&toblock, ret, fwlen) { match fwblock(&toblock, ret, fwlen) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(err) => {
println!("err: {e}, unable to push firewall rules, use super user") println!("Err: {err}, unable to push firewall rules, use super user")
} }
}; };
} }
async fn watchfiles(inoarc: Arc<RwLock<Inotify>>) -> Receiver<FileEvent> { async fn watchfiles(ctxarc: &Arc<RwLock<Context>>) -> Receiver<FileEvent> {
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE); let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);
let ctxclone = Arc::clone(ctxarc);
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
let events = inoarc.read().await.read_events().unwrap(); let events;
let instance;
{
let ctx = ctxclone.read().await;
instance = ctx.instance.clone();
}
events = instance.read_events().unwrap();
for inevent in events { for inevent in events {
let date: DateTime<Local> = Local::now().trunc_subsecs(0); let date: DateTime<Local> = Local::now().trunc_subsecs(0);
@ -212,12 +185,12 @@ async fn watchfiles(inoarc: Arc<RwLock<Inotify>>) -> Receiver<FileEvent> {
async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, bool) { async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, bool) {
let currentlen = match std::fs::metadata(&path.to_string()) { let currentlen = match std::fs::metadata(&path.to_string()) {
Ok(u) => u.len(), Ok(u) => u.len().clone(),
Err(_) => 0u64, Err(_) => 0u64,
}; };
let lastlen = match w.insert(path.to_string(), currentlen) { let lastlen = match w.insert(path.to_string(), currentlen) {
Some(u) => u, Some(u) => u,
None => currentlen, None => 0u64,
}; };
(lastlen, lastlen != currentlen) (lastlen, lastlen != currentlen)
} }
@ -232,16 +205,20 @@ async fn compare_files_changes(
let modfiles = inrx.recv().await.unwrap(); let modfiles = inrx.recv().await.unwrap();
let mut iplist: Vec<IpData> = vec![]; let mut iplist: Vec<IpData> = vec![];
let sas = { let sask;
let sas;
{
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
sas = ctx.clone().sas;
sask = sas.keys();
tnets = ctx.cfg.build_trustnets(); tnets = ctx.cfg.build_trustnets();
ctx.sas.clone() }
};
match modfiles.inevent.name { match modfiles.inevent.name {
Some(name) => { Some(name) => {
let filename = name.to_str().unwrap(); let filename = name.to_str().unwrap();
for (sak, sa) in sas.clone().iter_mut() { for sak in sask {
let sa = sas.get(sak).unwrap();
if modfiles.inevent.wd == sa.wd { if modfiles.inevent.wd == sa.wd {
let handle: String; let handle: String;
if sa.filename.as_str() == "" { if sa.filename.as_str() == "" {
@ -252,11 +229,13 @@ async fn compare_files_changes(
continue; continue;
} }
let (filesize, sizechanged) = { let (filesize, sizechanged);
{
let mut ctx = ctxarc.write().await; let mut ctx = ctxarc.write().await;
let sa = ctx.sas.get_mut(sak).unwrap(); let sa = ctx.sas.get_mut(sak).unwrap();
get_last_file_size(&mut sa.watchedfiles, &handle).await (filesize, sizechanged) =
}; get_last_file_size(&mut sa.watchedfiles, &handle).await;
}
if !sizechanged { if !sizechanged {
continue; continue;
@ -279,9 +258,14 @@ async fn compare_files_changes(
} }
} }
for ip in iplist { for ip in iplist {
let ipe = ipevent!("add", "file", gethostname(true), Some(ip)); let ipevent = IpEvent {
let ipetx = ipeventtx.read().await; msgtype: String::from("add"),
ipetx.send(ipe).await.unwrap(); hostname: gethostname(true),
mode: String::from("file"),
ipdata: ip,
};
let ipetx = ipeventtx.write().await;
ipetx.send(ipevent).await.unwrap();
} }
} }
None => {} None => {}

View File

@ -4,72 +4,54 @@ use serde_json;
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener; use tokio::net::TcpSocket;
use tokio::sync::RwLock; use tokio::sync::RwLock;
pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> { pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let ctxarc = ctxarc.clone(); let ctxarc = ctxarc.clone();
let addr: String = { ctxarc.read().await.cfg.api.parse().unwrap() }; let addr;
let listener = match TcpListener::bind(addr).await { {
Ok(o) => o, let ctx = ctxarc.read().await;
Err(e) => { addr = ctx.cfg.api.parse().unwrap();
println!("error: {e}"); }
std::process::exit(1);
} let socket = TcpSocket::new_v4().unwrap();
}; socket.bind(addr).unwrap();
socket.set_reuseaddr(true).unwrap();
let listener = socket.listen(1024).unwrap();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
//apitx.send(String::from("")).await.unwrap();
match listener.accept().await { match listener.accept().await {
Ok((mut socket, _addr)) => { Ok((stream, _addr)) => {
let mut buf = vec![0; 1024]; //let mut buf = [0; 1024];
let data;
match socket.readable().await { {
Ok(_) => { let ctx = ctxarc.read().await;
match socket.try_read(&mut buf) { data = serde_json::to_string(&ctx.blocklist);
Ok(_) => {}
Err(e) => {
println!("error: {e}");
continue;
}
};
}
Err(e) => {
println!("error: {e}");
continue;
}
} }
let msg = match String::from_utf8(buf.to_vec()) { match data {
Ok(o) => o.trim_matches(char::from(0)).trim().to_string(), Ok(dt) => {
Err(_) => "".to_string(), let (_reader, mut writer) = stream.into_split();
}; match writer.write_all(format!("{dt}").as_bytes()).await {
Ok(_) => {}
let res = format_result(&ctxarc, msg.as_str()).await; Err(err) => {
println!("{err}");
match socket.write_all(res.as_bytes()).await { }
Ok(_) => {} }
Err(e) => { }
println!("error: {e}"); Err(err) => {
println!("unable to serialize data: {err}");
} }
} }
} }
Err(err) => { Err(err) => {
println!("error: {err}"); println!("couldn't get client: {}", err)
} }
} }
} }
}); });
Ok(()) Ok(())
} }
async fn format_result(ctxarc: &Arc<RwLock<Context>>, mode: &str) -> String {
let data;
let ctx = ctxarc.read().await;
match mode {
"cfg" => data = serde_json::to_string(&ctx.cfg).unwrap(),
"blocklist" => data = serde_json::to_string(&ctx.blocklist).unwrap(),
_ => data = serde_json::to_string(&ctx.blocklist).unwrap(),
};
data
}

View File

@ -1,28 +0,0 @@
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}
pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
// Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
}
pub async fn _sleep_ms(ms: u64) {
sleep(Duration::from_millis(ms)).await;
}

View File

@ -1 +1 @@
(((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*)|(((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?) ((^\s*((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*$)|(^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$))

View File

@ -1,14 +1,21 @@
use lazy_static::lazy_static;
use nix::unistd; use nix::unistd;
use regex::Regex;
use std::boxed::Box; use std::boxed::Box;
use std::fs::File; use std::fs::File;
use std::io::*; use std::io::*;
use std::path::Path;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
lazy_static! {
static ref R_FILE_GZIP: Regex = Regex::new(r".*\.gz.*").unwrap();
}
pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> { pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
let mut file = match File::open(filename) { let mut file = match File::open(filename) {
Ok(o) => o, Ok(f) => f,
Err(e) => { Err(err) => {
println!("error: {e}"); println!("{err}");
return None; return None;
} }
}; };
@ -17,13 +24,21 @@ pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
Some(lines) Some(lines)
} }
pub async fn sleep_s(s: u64) { pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
sleep(Duration::from_secs(s)).await; // Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
} }
#[allow(dead_code)] pub async fn _sleep_ms(ms: u64) {
pub async fn sleep_ms(m: u64) { sleep(Duration::from_millis(ms)).await;
sleep(Duration::from_millis(m)).await; }
pub async fn sleep_s(s: u64) {
sleep(Duration::from_secs(s)).await;
} }
pub fn gethostname(show_fqdn: bool) -> String { pub fn gethostname(show_fqdn: bool) -> String {
@ -38,3 +53,19 @@ pub fn gethostname(show_fqdn: bool) -> String {
} }
hostname[0].to_string() hostname[0].to_string()
} }
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}

View File

@ -11,12 +11,12 @@ pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
let mut try_req = 0; let mut try_req = 0;
let client = httpclient(); let client = httpclient();
loop { loop {
match push_ip(&client, &server, &ip.ipdata.clone().unwrap()).await { match push_ip(&client, &server, &ip.ipdata).await {
Ok(_) => { Ok(_) => {
break; break;
} }
Err(e) => { Err(err) => {
println!("error: {e}"); println!("{err}");
sleep_s(1).await; sleep_s(1).await;
if try_req == MAX_FAILED_API_RATE { if try_req == MAX_FAILED_API_RATE {
break; break;
@ -31,7 +31,6 @@ async fn push_ip(client: &Client, server: &str, ip: &IpData) -> Result<(), ReqEr
let mut data: Vec<IpData> = vec![]; let mut data: Vec<IpData> = vec![];
data.push(IpData { data.push(IpData {
t: ip.t,
ip: ip.ip.to_string(), ip: ip.ip.to_string(),
src: ip.src.to_string(), src: ip.src.to_string(),
date: ip.date.to_string(), date: ip.date.to_string(),
@ -57,7 +56,6 @@ async fn _push_ip_bulk(
for ip in ips { for ip in ips {
data.push(IpData { data.push(IpData {
t: ip.t,
ip: ip.ip.to_string(), ip: ip.ip.to_string(),
src: ip.src.to_string(), src: ip.src.to_string(),
date: ip.date.to_string(), date: ip.date.to_string(),

View File

@ -14,13 +14,13 @@ use tungstenite::*;
pub async fn websocketreqrep( pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>, ctxarc: &Arc<RwLock<Context>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> { ) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event, wscfg); let (mut wssocketrr, bootstrap_event, cfg);
{ {
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
bootstrap_event = ctx.cfg.bootstrap_event().clone(); bootstrap_event = ctx.cfg.bootstrap_event().clone();
wscfg = ctx.cfg.ws.get("reqrep").unwrap().clone(); cfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
} }
wssocketrr = websocketconnect(&wscfg, &gethostname(true)).await.unwrap(); wssocketrr = websocketconnect(&cfg, &gethostname(true)).await.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await; send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
return wssocketrr; return wssocketrr;
@ -41,28 +41,19 @@ pub async fn websocketpubsub(
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
let mut ws = websocket.write().await; let mut ws = websocket.write().await;
match ws.read() { match ws.read_message() {
Ok(msg) => { Ok(msg) => {
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) { let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
Ok(o) => o, Ok(o) => o,
Err(e) => { Err(_e) => {
println!("error in pubsub: {e:?}");
continue; continue;
} }
}; };
match tosend.ipdata.clone() { if tosend.ipdata.hostname != gethostname(true)
Some(o) => { || tosend.msgtype == "init".to_string()
if o.hostname != gethostname(true) {
|| tosend.msgtype == "init".to_string() let txps = txpubsub.write().await;
{ txps.send(tosend).await.unwrap();
let txps = txpubsub.read().await;
txps.send(tosend).await.unwrap();
}
}
None => {
let txps = txpubsub.read().await;
txps.send(tosend.clone()).await.unwrap();
}
} }
} }
Err(e) => { Err(e) => {
@ -99,7 +90,9 @@ pub async fn websocketconnect<'a>(
} }
println!("connected to {endpoint}"); println!("connected to {endpoint}");
let msg = json!({ "hostname": hostname }); let msg = json!({ "hostname": hostname });
socket.send(Message::Text(msg.to_string())).unwrap(); socket
.write_message(Message::Text(msg.to_string()))
.unwrap();
Ok(socket) Ok(socket)
} }
@ -110,30 +103,32 @@ pub async fn send_to_ipbl_websocket(
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
if ws.can_write() { if ws.can_write() {
match ws.send(Message::Text(msg)) { match ws.write_message(Message::Text(msg)) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err send read: {e:?}"); println!("err send read: {e:?}");
return false; return handle_websocket_error(ws);
} }
}; };
} else { } else {
println!("can't write to socket"); return handle_websocket_error(ws);
return false;
}; };
if ws.can_read() { if ws.can_read() {
match ws.read() { match ws.read_message() {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err send read: {e:?}"); println!("err send read: {e:?}");
return false; return handle_websocket_error(ws);
} }
}; };
} else { } else {
println!("can't read from socket"); return handle_websocket_error(ws);
return false;
}; };
true true
} }
fn handle_websocket_error(ws: &mut WebSocket<MaybeTlsStream<TcpStream>>) -> bool {
ws.close(None).unwrap();
return false;
}