From 36f892cf422b59388d7f6be73bc701450a1b32fe Mon Sep 17 00:00:00 2001 From: Paul Lecuq Date: Sun, 12 Mar 2023 14:27:05 +0100 Subject: [PATCH] updated websocket branch --- README.md | 2 +- src/ipblc.rs | 64 +++++++++++++++++++++--------------------------- src/websocket.rs | 41 ++++++++++++++----------------- 3 files changed, 48 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index 434174a..84f88ab 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ See [here](NOTES.md) ## License ```text -Copyright (c) 2021, 2022 PaulBSD +Copyright (c) 2021, 2022, 2023 PaulBSD All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/src/ipblc.rs b/src/ipblc.rs index 91edfe9..f0e1181 100644 --- a/src/ipblc.rs +++ b/src/ipblc.rs @@ -18,7 +18,6 @@ use tokio::sync::RwLock; pub const PKG_NAME: &str = env!("CARGO_PKG_NAME"); const BL_CHAN_SIZE: usize = 32; const ZMQ_CHAN_SIZE: usize = 64; -const API_CHAN_SIZE: usize = 64; pub async fn run() { let fqdn = gethostname(true); @@ -33,33 +32,28 @@ pub async fn run() { println!("Launching {}, version {}", PKG_NAME, pkgversion); fwinit(); - let (_apitx, mut apirx): (Sender, Receiver) = channel(API_CHAN_SIZE); - - let ctxclone = Arc::clone(&ctxarc); - apiserver(&ctxclone).await.unwrap(); + apiserver(&ctxarc).await.unwrap(); // initialize sockets let (ipeventtx, mut ipeventrx): (Sender, Receiver) = channel(ZMQ_CHAN_SIZE); - let (wspubsubsocket, mut wsreqsocket) = websocketinit(&ctxclone, &ipeventtx).await; + let wssocket = websocketinit(&ctxarc, &ipeventtx).await; - let mut blrx = watchfiles(&ctxclone).await; + let mut blrx = watchfiles(&ctxarc).await; let ctxclone = Arc::clone(&ctxarc); tokio::spawn(async move { compare_files_changes(&ctxclone, &mut blrx, &ipeventtx).await; }); - let ctxclone = Arc::clone(&ctxarc); - let bootstrap_event = ctxclone.read().await.cfg.bootstrap_event().clone(); - send_to_ipbl_websocket(&mut wsreqsocket, &bootstrap_event, &mut ret).await; + let bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone(); + + send_to_ipbl_websocket(&wssocket, &bootstrap_event, &mut ret).await; loop { ret = Vec::new(); // wait for logs parse and zmq channel receive - //let mut received_ip = ipdatarx.recv(); let ipdata_wait = ipeventrx.recv(); - let apimsg_wait = apirx.recv(); let force_wait = sleep_ms(200); let ctxclone = Arc::clone(&ctxarc); @@ -78,7 +72,7 @@ pub async fn run() { hostname: fqdn.clone(), ipdata: ip_to_send, }; - send_to_ipbl_websocket(&mut wsreqsocket, &ipe, &mut ret).await; + send_to_ipbl_websocket(&wssocket, &ipe, &mut ret).await; } continue } @@ -97,11 +91,10 @@ pub async fn run() { ipdata: ipevent.ipdata, }; send_to_ipbl_api(&ctx, &event, &mut ret).await; - send_to_ipbl_websocket(&mut wsreqsocket, &event, &mut ret).await; + send_to_ipbl_websocket(&wssocket, &event, &mut ret).await; } } } - _val = apimsg_wait => {} _val = force_wait => {} }; @@ -110,35 +103,34 @@ pub async fn run() { let mut ctx = ctxarc.write().await; ctx.gc_blocklist().await; toblock = ctx.get_blocklist_toblock().await; + + // apply firewall blocking + match fwblock(&toblock, &mut ret, &mut fwlen) { + Ok(_) => {} + Err(err) => { + println!("Err: {err}, unable to push firewall rules, use super user") + } + }; } - // apply firewall blocking - match fwblock(&toblock, &mut ret, &mut fwlen) { - Ok(_) => {} - Err(err) => { - println!("Err: {err}, unable to push firewall rules, use super user") - } - }; // log lines if ret.len() > 0 { println!("{ret}", ret = ret.join(", ")); } - { - let now_cfg_reload = Local::now().trunc_subsecs(0); - if (now_cfg_reload - last_cfg_reload) > Duration::seconds(5) { - // reload configuration from the server - let mut ctx = ctxclone.write().await; - match ctx.load().await { - Ok(_) => { - last_cfg_reload = Local::now().trunc_subsecs(0); - } - Err(err) => { - println!("error loading config: {err}"); - } + let now_cfg_reload = Local::now().trunc_subsecs(0); + if (now_cfg_reload - last_cfg_reload) > Duration::seconds(5) { + // reload configuration from the server + let mut ctx = ctxclone.write().await; + match ctx.load().await { + Ok(_) => { + last_cfg_reload = Local::now().trunc_subsecs(0); } - }; - } + Err(err) => { + println!("error loading config: {err}"); + } + } + }; } } diff --git a/src/websocket.rs b/src/websocket.rs index 3fcbed7..b70f375 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -1,49 +1,42 @@ use crate::config::{Context, WebSocketCfg}; use crate::ip::IpEvent; use crate::utils::gethostname; -use std::net::TcpStream; -use tungstenite::stream::*; -use tungstenite::*; +use std::net::TcpStream; use std::sync::Arc; use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; +use tungstenite::stream::*; +use tungstenite::*; pub async fn websocketconnect<'a>( wscfg: &WebSocketCfg, ) -> Result>, Error> { - let (socket, response) = connect(&wscfg.endpoint).expect("Can't connect"); + let (socket, _response) = connect(&wscfg.endpoint).expect("Can't connect"); Ok(socket) } pub async fn websocketinit( ctx: &Arc>, ipeventtx: &Sender, -) -> ( - Arc>>>, - WebSocket>, -) { +) -> Arc>>> { let ctxarc = Arc::clone(&ctx); - let wsreqsocket; - let wssubsocket; - let wssubsocketcb; + let wssocket; + let wssocketcb; { let ctx = ctxarc.read().await; - wsreqsocket = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap()) - .await - .unwrap(); - wssubsocket = Arc::new(RwLock::new( - websocketconnect(&ctx.cfg.ws.get("pubsub").unwrap()) + wssocket = Arc::new(RwLock::new( + websocketconnect(&ctx.cfg.ws.get("websocket").unwrap()) .await .unwrap(), )); - wssubsocketcb = wssubsocket.clone(); + wssocketcb = wssocket.clone(); } - wslistenpubsub(wssubsocket, ipeventtx.clone()).await; - return (wssubsocketcb, wsreqsocket); + wslistenpubsub(wssocket, ipeventtx.clone()).await; + return wssocketcb; } async fn wslistenpubsub( @@ -83,18 +76,22 @@ async fn wslistenpubsub( } pub async fn send_to_ipbl_websocket( - reqsocket: &mut WebSocket>, + wssocket: &Arc>>>, ip: &IpEvent, _ret: &mut Vec, ) { let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); - match reqsocket.write_message(Message::Text(msg)) { + + let wsclone = Arc::clone(wssocket); + let mut ws = wsclone.write().await; + + match ws.write_message(Message::Text(msg)) { Ok(_) => {} Err(e) => { println!("{e:?}") } }; - match reqsocket.read_message() { + match ws.read_message() { Ok(o) => { println!("{o}") }