some code refactor
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing

This commit is contained in:
Paul 2023-05-10 21:32:27 +02:00
parent 2134f09210
commit a720562c3c
6 changed files with 37 additions and 35 deletions

View File

@ -26,7 +26,6 @@ pub struct Context {
pub cfg: Config, pub cfg: Config,
pub discovery: Discovery, pub discovery: Discovery,
pub flags: Flags, pub flags: Flags,
pub hostname: String,
pub instance: Box<Inotify>, pub instance: Box<Inotify>,
pub sas: HashMap<String, SetMap>, pub sas: HashMap<String, SetMap>,
pub hashwd: HashMap<String, WatchDescriptor>, pub hashwd: HashMap<String, WatchDescriptor>,
@ -52,7 +51,6 @@ impl Context {
pub async fn new() -> Self { pub async fn new() -> Self {
// Get flags // Get flags
let argp: ArgMatches = Context::argparse(); let argp: ArgMatches = Context::argparse();
//let debug: bool = argp.contains_id("debug");
let debug: bool = argp.get_one::<bool>("debug").unwrap().to_owned(); let debug: bool = argp.get_one::<bool>("debug").unwrap().to_owned();
let server: String = argp.get_one::<String>("server").unwrap().to_string(); let server: String = argp.get_one::<String>("server").unwrap().to_string();
@ -60,7 +58,6 @@ impl Context {
let mut ctx = Context { let mut ctx = Context {
cfg: Config::new(), cfg: Config::new(),
flags: Flags { debug, server }, flags: Flags { debug, server },
hostname: gethostname(true),
discovery: Discovery { discovery: Discovery {
version: "1.0".to_string(), version: "1.0".to_string(),
urls: HashMap::new(), urls: HashMap::new(),
@ -178,7 +175,7 @@ impl Context {
.unwrap() .unwrap()
.with_timezone(&chrono::Local); .with_timezone(&chrono::Local);
let blocktime = set.blocktime; let blocktime = set.blocktime;
if ipevent.mode == "file".to_string() && self.hostname == ipevent.hostname { if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
let block = self let block = self
.blocklist .blocklist
.entry(ipevent.ipdata.ip.to_string()) .entry(ipevent.ipdata.ip.to_string())

View File

@ -78,7 +78,7 @@ impl Display for IpData {
pub fn filter( pub fn filter(
lines: Box<dyn Read>, lines: Box<dyn Read>,
list: &mut Vec<IpData>, iplist: &mut Vec<IpData>,
trustnets: &Vec<IpNet>, trustnets: &Vec<IpNet>,
regex: &Regex, regex: &Regex,
src: &String, src: &String,
@ -129,7 +129,7 @@ pub fn filter(
}; };
if !is_trusted(&ipaddr, &trustnets) { if !is_trusted(&ipaddr, &trustnets) {
list.push(IpData { iplist.push(IpData {
ip: s_ipaddr, ip: s_ipaddr,
src: src.to_owned(), src: src.to_owned(),
date: s_date.to_rfc3339().to_owned(), date: s_date.to_rfc3339().to_owned(),

View File

@ -64,10 +64,15 @@ pub async fn run() {
ipevent = ipeventrx.recv() => { ipevent = ipeventrx.recv() => {
let received_ip = ipevent.unwrap(); let received_ip = ipevent.unwrap();
let mut ctx = ctxclone.write().await; let (toblock,server);
{
let mut ctx = ctxclone.write().await;
toblock = ctx.get_blocklist_toblock().await;
server = ctx.flags.server.clone();
}
if received_ip.msgtype == "bootstrap".to_string() { if received_ip.msgtype == "bootstrap".to_string() {
for ip_to_send in ctx.get_blocklist_toblock().await { for ip_to_send in toblock {
let ipe = IpEvent{ let ipe = IpEvent{
msgtype: String::from("init"), msgtype: String::from("init"),
mode: String::from("ws"), mode: String::from("ws"),
@ -75,17 +80,19 @@ pub async fn run() {
ipdata: ip_to_send, ipdata: ip_to_send,
}; };
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
drop(ctx);
wssocketrr = websocketreqrep(&ctxwsrr).await; wssocketrr = websocketreqrep(&ctxwsrr).await;
break; break;
} }
} }
continue continue
} }
// refresh context blocklist // refresh context blocklist
let filtered_ipevent = ctx.update_blocklist(&received_ip).await; let filtered_ipevent;
{
let mut ctx = ctxarc.write().await;
filtered_ipevent = ctx.update_blocklist(&received_ip).await;
}
// send ip list to api and ws sockets // send ip list to api and ws sockets
if let Some(ipevent) = filtered_ipevent { if let Some(ipevent) = filtered_ipevent {
@ -97,9 +104,8 @@ pub async fn run() {
hostname: gethostname(true), hostname: gethostname(true),
ipdata: ipevent.ipdata, ipdata: ipevent.ipdata,
}; };
send_to_ipbl_api(&ctx.flags.server, &ipe).await; send_to_ipbl_api(&server.clone(), &ipe).await;
let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await; let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
drop(ctx);
if !status { if !status {
wssocketrr = websocketreqrep(&ctxwsrr).await; wssocketrr = websocketreqrep(&ctxwsrr).await;
continue; continue;

View File

@ -9,6 +9,5 @@ mod websocket;
#[tokio::main] #[tokio::main]
pub async fn main() { pub async fn main() {
// Create a new context
ipblc::run().await; ipblc::run().await;
} }

View File

@ -8,7 +8,7 @@ use reqwest::Error as ReqError;
const MAX_FAILED_API_RATE: u64 = 10; const MAX_FAILED_API_RATE: u64 = 10;
pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) { pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
let mut i = 0; let mut try_req = 0;
let client = httpclient(); let client = httpclient();
loop { loop {
match push_ip(&client, &server, &ip.ipdata).await { match push_ip(&client, &server, &ip.ipdata).await {
@ -18,10 +18,10 @@ pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
Err(err) => { Err(err) => {
println!("{err}"); println!("{err}");
sleep_s(1).await; sleep_s(1).await;
if i == MAX_FAILED_API_RATE { if try_req == MAX_FAILED_API_RATE {
break; break;
} }
i += 1; try_req += 1;
} }
}; };
} }

View File

@ -14,16 +14,14 @@ use tungstenite::*;
pub async fn websocketreqrep( pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>, ctxarc: &Arc<RwLock<Context>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> { ) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event); let (mut wssocketrr, bootstrap_event, cfg);
{ {
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone(); bootstrap_event = ctx.cfg.bootstrap_event().clone();
cfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
wssocketrr = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone())
.await
.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
} }
wssocketrr = websocketconnect(&cfg, &gethostname(true)).await.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
return wssocketrr; return wssocketrr;
} }
@ -32,12 +30,13 @@ pub async fn websocketpubsub(
ctxarc: &Arc<RwLock<Context>>, ctxarc: &Arc<RwLock<Context>>,
txpubsub: Arc<RwLock<Sender<IpEvent>>>, txpubsub: Arc<RwLock<Sender<IpEvent>>>,
) { ) {
let ctx = ctxarc.read().await; let cfg;
let cfg = ctx.cfg.ws.get("pubsub").unwrap().clone(); {
let hostname = ctx.hostname.clone(); let ctx = ctxarc.read().await;
drop(ctx); cfg = ctx.cfg.ws.get("pubsub").unwrap().clone();
}
let mut websocket = Arc::new(RwLock::new( let mut websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, hostname.clone()).await.unwrap(), websocketconnect(&cfg, &gethostname(true)).await.unwrap(),
)); ));
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
@ -62,7 +61,7 @@ pub async fn websocketpubsub(
ws.close(None).unwrap(); ws.close(None).unwrap();
drop(ws); drop(ws);
websocket = Arc::new(RwLock::new( websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, hostname.clone()).await.unwrap(), websocketconnect(&cfg, &gethostname(true)).await.unwrap(),
)); ));
} }
}; };
@ -72,23 +71,24 @@ pub async fn websocketpubsub(
pub async fn websocketconnect<'a>( pub async fn websocketconnect<'a>(
wscfg: &WebSocketCfg, wscfg: &WebSocketCfg,
hostname: String, hostname: &String,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> { ) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> {
print!("connecting to {} ... ", &wscfg.endpoint); let endpoint = &wscfg.endpoint;
print!("connecting to {} ... ", endpoint);
io::stdout().flush().unwrap(); io::stdout().flush().unwrap();
let mut socket; let mut socket;
loop { loop {
(socket, _) = match connect(&wscfg.endpoint) { (socket, _) = match connect(endpoint) {
Ok((o, e)) => (o, e), Ok((o, e)) => (o, e),
_ => { _ => {
println!("error connecting to {}, retrying", &wscfg.endpoint); println!("error connecting to {endpoint}, retrying");
sleep_s(1).await; sleep_s(1).await;
continue; continue;
} }
}; };
break; break;
} }
println!("connected to {}", &wscfg.endpoint); println!("connected to {endpoint}");
let msg = json!({ "hostname": hostname }); let msg = json!({ "hostname": hostname });
socket socket
.write_message(Message::Text(msg.to_string())) .write_message(Message::Text(msg.to_string()))