From 47cc30e79e096ef0de66623c0810d8b98a809dac Mon Sep 17 00:00:00 2001 From: Paul Lecuq Date: Sun, 9 Apr 2023 01:42:17 +0200 Subject: [PATCH] updated ipblc websocket feat --- Cargo.lock | 146 ++++++++++++++++++++++++---------- {src => old}/zmqcom.rs | 0 src/ipblc.rs | 33 ++++---- src/main.rs | 2 +- src/{api.rs => monitoring.rs} | 12 +-- src/webservice.rs | 7 +- src/websocket.rs | 111 ++++++++++++++++++-------- 7 files changed, 217 insertions(+), 94 deletions(-) rename {src => old}/zmqcom.rs (100%) rename src/{api.rs => monitoring.rs} (88%) diff --git a/Cargo.lock b/Cargo.lock index 9d9b575..5457d65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -204,9 +204,9 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" [[package]] name = "cpufeatures" @@ -306,13 +306,13 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d6a0976c999d473fe89ad888d5a284e55366d9dc9038b1ba2aa15128c4afa0" +checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" dependencies = [ "errno-dragonfly", "libc", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] @@ -391,9 +391,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" +checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" dependencies = [ "cfg-if", "libc", @@ -535,9 +535,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.54" +version = "0.1.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c17cc76786e99f8d2f055c11159e7f0091c42474dcc3189fbab96072e873e6d" +checksum = "0722cd7114b7de04316e7ea5456a0bbb20e4adb46fd27a3697adb812cff0f37c" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -579,13 +579,13 @@ dependencies = [ [[package]] name = "io-lifetimes" -version = "1.0.9" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09270fd4fa1111bc614ed2246c7ef56239a3063d5be0d1ec3b589c505d400aeb" +checksum = "9c66c74d2ae7e79a5a8f7ac924adbe38ee42a859c6539ad869eb51f0b52dc220" dependencies = [ "hermit-abi 0.3.1", "libc", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] @@ -616,14 +616,14 @@ checksum = "12b6ee2129af8d4fb011108c73d99a1b83a85977f23b82460c0ae2e25bb4b57f" [[package]] name = "is-terminal" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "256017f749ab3117e93acb91063009e1f1bb56d03965b14c2c8df4eb02c524d8" +checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" dependencies = [ "hermit-abi 0.3.1", "io-lifetimes", "rustix", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] @@ -649,9 +649,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.140" +version = "0.2.141" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" +checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5" [[package]] name = "link-cplusplus" @@ -904,9 +904,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.55" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d0dd4be24fcdcfeaa12a432d588dc59bbad6cad3510c67e74a2b6b2fc950564" +checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" dependencies = [ "unicode-ident", ] @@ -1032,16 +1032,16 @@ dependencies = [ [[package]] name = "rustix" -version = "0.37.6" +version = "0.37.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d097081ed288dfe45699b72f5b5d648e5f15d64d900c7080273baa20c16a6849" +checksum = "85597d61f83914ddeba6a47b3b8ffe7365107221c2e557ed94426489fefb5f77" dependencies = [ "bitflags", "errno", "io-lifetimes", "libc", "linux-raw-sys", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] @@ -1671,11 +1671,11 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows" -version = "0.46.0" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdacb41e6a96a052c6cb63a144f24900236121c6f63f4f8219fef5977ecb0c25" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" dependencies = [ - "windows-targets", + "windows-targets 0.48.0", ] [[package]] @@ -1684,13 +1684,13 @@ version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", ] [[package]] @@ -1699,7 +1699,16 @@ version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" dependencies = [ - "windows-targets", + "windows-targets 0.42.2", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.0", ] [[package]] @@ -1708,13 +1717,28 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + +[[package]] +name = "windows-targets" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" +dependencies = [ + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", ] [[package]] @@ -1723,42 +1747,84 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" + [[package]] name = "windows_i686_gnu" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" +[[package]] +name = "windows_i686_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" + [[package]] name = "windows_i686_msvc" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" +[[package]] +name = "windows_i686_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" + [[package]] name = "winreg" version = "0.10.1" diff --git a/src/zmqcom.rs b/old/zmqcom.rs similarity index 100% rename from src/zmqcom.rs rename to old/zmqcom.rs diff --git a/src/ipblc.rs b/src/ipblc.rs index f0e1181..cf84282 100644 --- a/src/ipblc.rs +++ b/src/ipblc.rs @@ -1,7 +1,7 @@ -use crate::api::apiserver; use crate::config::{Context, GIT_VERSION}; use crate::fw::{fwblock, fwinit}; use crate::ip::{filter, IpData, IpEvent}; +use crate::monitoring::apiserver; use crate::utils::{gethostname, read_lines, sleep_ms}; use crate::webservice::send_to_ipbl_api; use crate::websocket::{send_to_ipbl_websocket, websocketinit}; @@ -17,7 +17,7 @@ use tokio::sync::RwLock; pub const PKG_NAME: &str = env!("CARGO_PKG_NAME"); const BL_CHAN_SIZE: usize = 32; -const ZMQ_CHAN_SIZE: usize = 64; +const WS_CHAN_SIZE: usize = 64; pub async fn run() { let fqdn = gethostname(true); @@ -32,11 +32,13 @@ pub async fn run() { println!("Launching {}, version {}", PKG_NAME, pkgversion); fwinit(); - apiserver(&ctxarc).await.unwrap(); + let ctxapi = Arc::clone(&ctxarc); + apiserver(&ctxapi).await.unwrap(); // initialize sockets - let (ipeventtx, mut ipeventrx): (Sender, Receiver) = channel(ZMQ_CHAN_SIZE); - let wssocket = websocketinit(&ctxarc, &ipeventtx).await; + let (ipeventtx, mut ipeventrx): (Sender, Receiver) = channel(WS_CHAN_SIZE); + let ctxws = Arc::clone(&ctxarc); + let wssocketrr = websocketinit(&ctxws, &ipeventtx).await; let mut blrx = watchfiles(&ctxarc).await; @@ -47,19 +49,15 @@ pub async fn run() { let bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone(); - send_to_ipbl_websocket(&wssocket, &bootstrap_event, &mut ret).await; + send_to_ipbl_websocket(&wssocketrr, &bootstrap_event, &mut ret).await; loop { ret = Vec::new(); - // wait for logs parse and zmq channel receive - let ipdata_wait = ipeventrx.recv(); - let force_wait = sleep_ms(200); - let ctxclone = Arc::clone(&ctxarc); tokio::select! { - val = ipdata_wait => { + val = ipeventrx.recv() => { let received_ip = val.unwrap(); let mut ctx = ctxclone.write().await; @@ -68,11 +66,11 @@ pub async fn run() { for ip_to_send in ctx.get_blocklist_toblock().await { let ipe = IpEvent{ msgtype: String::from("init"), - mode: String::from("zmq"), + mode: String::from("ws"), hostname: fqdn.clone(), ipdata: ip_to_send, }; - send_to_ipbl_websocket(&wssocket, &ipe, &mut ret).await; + send_to_ipbl_websocket(&wssocketrr, &ipe, &mut ret).await; } continue } @@ -90,12 +88,15 @@ pub async fn run() { hostname: fqdn.clone(), ipdata: ipevent.ipdata, }; - send_to_ipbl_api(&ctx, &event, &mut ret).await; - send_to_ipbl_websocket(&wssocket, &event, &mut ret).await; + println!("blabla1"); + send_to_ipbl_api(&ctx.client, &ctx.hostname, &ctx.flags.server, &event, &mut ret).await; + println!("blabla2"); + send_to_ipbl_websocket(&wssocketrr, &event, &mut ret).await; + println!("blabla3"); } } } - _val = force_wait => {} + _val = sleep_ms(200) => {} }; let toblock; diff --git a/src/main.rs b/src/main.rs index 420f70e..9d2566f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,8 @@ -mod api; mod config; mod fw; mod ip; mod ipblc; +mod monitoring; mod utils; mod webservice; mod websocket; diff --git a/src/api.rs b/src/monitoring.rs similarity index 88% rename from src/api.rs rename to src/monitoring.rs index 801f630..90b5b10 100644 --- a/src/api.rs +++ b/src/monitoring.rs @@ -8,10 +8,12 @@ use tokio::net::TcpSocket; use tokio::sync::RwLock; pub async fn apiserver(ctxarc: &Arc>) -> io::Result<()> { - let ctxclone = Arc::clone(ctxarc); - let ctx = ctxclone.read().await; - let addr = ctx.cfg.api.parse().unwrap(); - drop(ctx); + let ctxarc = ctxarc.clone(); + let addr; + { + let ctx = ctxarc.read().await; + addr = ctx.cfg.api.parse().unwrap(); + } let socket = TcpSocket::new_v4().unwrap(); socket.bind(addr).unwrap(); @@ -26,7 +28,7 @@ pub async fn apiserver(ctxarc: &Arc>) -> io::Result<()> { //let mut buf = [0; 1024]; let data; { - let ctx = ctxclone.read().await; + let ctx = ctxarc.read().await; data = serde_json::to_string(&ctx.blocklist); } diff --git a/src/webservice.rs b/src/webservice.rs index 27a79b2..d312a83 100644 --- a/src/webservice.rs +++ b/src/webservice.rs @@ -5,7 +5,7 @@ use crate::utils::sleep_s; use reqwest::Client; use reqwest::Error as ReqError; -pub async fn send_to_ipbl_ws( +pub async fn send_to_ipbl_api( client: &Client, hostname: &str, server: &str, @@ -13,6 +13,7 @@ pub async fn send_to_ipbl_ws( ret: &mut Vec, ) { ret.push(format!("host: {hostname}", hostname = hostname)); + let mut i = 1; loop { match push_ip(&client, &server, &ip.ipdata, ret).await { Ok(_) => { @@ -21,6 +22,10 @@ pub async fn send_to_ipbl_ws( Err(err) => { println!("{err}"); sleep_s(1).await; + if i == 10 { + break; + } + i += 1; } }; } diff --git a/src/websocket.rs b/src/websocket.rs index b70f375..4c2467c 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -1,7 +1,8 @@ use crate::config::{Context, WebSocketCfg}; use crate::ip::IpEvent; -use crate::utils::gethostname; +use crate::utils::{gethostname, sleep_ms}; +use serde_json::json; use std::net::TcpStream; use std::sync::Arc; use tokio::sync::mpsc::Sender; @@ -11,66 +12,107 @@ use tungstenite::*; pub async fn websocketconnect<'a>( wscfg: &WebSocketCfg, + hostname: String, ) -> Result>, Error> { - let (socket, _response) = connect(&wscfg.endpoint).expect("Can't connect"); + let (mut socket, _response) = connect(&wscfg.endpoint).expect("Can't connect"); + let msg = json!({ "hostname": hostname }); + socket + .write_message(Message::Text(msg.to_string())) + .unwrap(); Ok(socket) } pub async fn websocketinit( - ctx: &Arc>, + ctxarc: &Arc>, ipeventtx: &Sender, ) -> Arc>>> { - let ctxarc = Arc::clone(&ctx); - - let wssocket; - let wssocketcb; + let (wssocketps, wssocketreqrep); { let ctx = ctxarc.read().await; - wssocket = Arc::new(RwLock::new( - websocketconnect(&ctx.cfg.ws.get("websocket").unwrap()) + wssocketps = Arc::new(RwLock::new( + websocketconnect(&ctx.cfg.ws.get("pubsub").unwrap(), ctx.hostname.clone()) + .await + .unwrap(), + )); + wssocketreqrep = Arc::new(RwLock::new( + websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone()) .await .unwrap(), )); - - wssocketcb = wssocket.clone(); } - wslistenpubsub(wssocket, ipeventtx.clone()).await; - return wssocketcb; + wslistenpubsub(wssocketps, ipeventtx.clone()).await; + return wssocketreqrep; } async fn wslistenpubsub( - socket: Arc>>>, + websocket: Arc>>>, txpubsub: Sender, ) { tokio::spawn(async move { loop { let msgs: Option; { - let mut socket = socket.write().await; - msgs = match socket.read_message() { + let mut ws = websocket.write().await; + msgs = match ws.read_message() { Ok(s) => { println!("msg: {}", s); None } Err(e) => { println!("error: {e:?}"); - socket.close(None).unwrap(); + ws.close(None).unwrap(); return; } }; - } - match msgs { - Some(ss) => { - let tosend: IpEvent = serde_json::from_str(ss.as_str()).unwrap(); - if tosend.ipdata.hostname != gethostname(true) - || tosend.msgtype == "init".to_string() - { - txpubsub.send(tosend).await.unwrap(); + match msgs { + Some(ss) => { + let tosend: IpEvent = serde_json::from_str(ss.as_str()).unwrap(); + if tosend.ipdata.hostname != gethostname(true) + || tosend.msgtype == "init".to_string() + { + txpubsub.send(tosend).await.unwrap(); + } } - } - None => {} - }; + None => {} + }; + } + } + }); +} + +async fn wslistenreqrep( + websocket: Arc>>>, + txpubsub: Sender, +) { + tokio::spawn(async move { + loop { + let msgs: Option; + { + let mut ws = websocket.write().await; + msgs = match ws.read_message() { + Ok(s) => { + println!("msg: {}", s); + None + } + Err(e) => { + println!("error: {e:?}"); + ws.close(None).unwrap(); + return; + } + }; + match msgs { + Some(ss) => { + let tosend: IpEvent = serde_json::from_str(ss.as_str()).unwrap(); + if tosend.ipdata.hostname != gethostname(true) + || tosend.msgtype == "init".to_string() + { + txpubsub.send(tosend).await.unwrap(); + } + } + None => {} + }; + } } }); } @@ -82,21 +124,28 @@ pub async fn send_to_ipbl_websocket( ) { let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); + println!("testarc"); let wsclone = Arc::clone(wssocket); + println!("testarc2"); let mut ws = wsclone.write().await; + println!("write"); match ws.write_message(Message::Text(msg)) { - Ok(_) => {} + Ok(o) => { + println!("{o:?}") + } Err(e) => { - println!("{e:?}") + println!("err 1: {e:?}") } }; + + println!("read"); match ws.read_message() { Ok(o) => { println!("{o}") } Err(e) => { - println!("{e:?}") + println!("err 2: {e:?}") } }; }