updated ipblc websocket feat

This commit is contained in:
Paul 2023-04-09 01:42:17 +02:00
parent 715194ede5
commit 47cc30e79e
7 changed files with 217 additions and 94 deletions

146
Cargo.lock generated
View File

@ -204,9 +204,9 @@ dependencies = [
[[package]] [[package]]
name = "core-foundation-sys" name = "core-foundation-sys"
version = "0.8.3" version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa"
[[package]] [[package]]
name = "cpufeatures" name = "cpufeatures"
@ -306,13 +306,13 @@ dependencies = [
[[package]] [[package]]
name = "errno" name = "errno"
version = "0.3.0" version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d6a0976c999d473fe89ad888d5a284e55366d9dc9038b1ba2aa15128c4afa0" checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
dependencies = [ dependencies = [
"errno-dragonfly", "errno-dragonfly",
"libc", "libc",
"windows-sys 0.45.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -391,9 +391,9 @@ dependencies = [
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.8" version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
@ -535,9 +535,9 @@ dependencies = [
[[package]] [[package]]
name = "iana-time-zone" name = "iana-time-zone"
version = "0.1.54" version = "0.1.56"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c17cc76786e99f8d2f055c11159e7f0091c42474dcc3189fbab96072e873e6d" checksum = "0722cd7114b7de04316e7ea5456a0bbb20e4adb46fd27a3697adb812cff0f37c"
dependencies = [ dependencies = [
"android_system_properties", "android_system_properties",
"core-foundation-sys", "core-foundation-sys",
@ -579,13 +579,13 @@ dependencies = [
[[package]] [[package]]
name = "io-lifetimes" name = "io-lifetimes"
version = "1.0.9" version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09270fd4fa1111bc614ed2246c7ef56239a3063d5be0d1ec3b589c505d400aeb" checksum = "9c66c74d2ae7e79a5a8f7ac924adbe38ee42a859c6539ad869eb51f0b52dc220"
dependencies = [ dependencies = [
"hermit-abi 0.3.1", "hermit-abi 0.3.1",
"libc", "libc",
"windows-sys 0.45.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -616,14 +616,14 @@ checksum = "12b6ee2129af8d4fb011108c73d99a1b83a85977f23b82460c0ae2e25bb4b57f"
[[package]] [[package]]
name = "is-terminal" name = "is-terminal"
version = "0.4.6" version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "256017f749ab3117e93acb91063009e1f1bb56d03965b14c2c8df4eb02c524d8" checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
dependencies = [ dependencies = [
"hermit-abi 0.3.1", "hermit-abi 0.3.1",
"io-lifetimes", "io-lifetimes",
"rustix", "rustix",
"windows-sys 0.45.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -649,9 +649,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.140" version = "0.2.141"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5"
[[package]] [[package]]
name = "link-cplusplus" name = "link-cplusplus"
@ -904,9 +904,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.55" version = "1.0.56"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d0dd4be24fcdcfeaa12a432d588dc59bbad6cad3510c67e74a2b6b2fc950564" checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@ -1032,16 +1032,16 @@ dependencies = [
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.37.6" version = "0.37.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d097081ed288dfe45699b72f5b5d648e5f15d64d900c7080273baa20c16a6849" checksum = "85597d61f83914ddeba6a47b3b8ffe7365107221c2e557ed94426489fefb5f77"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"errno", "errno",
"io-lifetimes", "io-lifetimes",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
"windows-sys 0.45.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@ -1671,11 +1671,11 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]] [[package]]
name = "windows" name = "windows"
version = "0.46.0" version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdacb41e6a96a052c6cb63a144f24900236121c6f63f4f8219fef5977ecb0c25" checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f"
dependencies = [ dependencies = [
"windows-targets", "windows-targets 0.48.0",
] ]
[[package]] [[package]]
@ -1684,13 +1684,13 @@ version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7"
dependencies = [ dependencies = [
"windows_aarch64_gnullvm", "windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc", "windows_aarch64_msvc 0.42.2",
"windows_i686_gnu", "windows_i686_gnu 0.42.2",
"windows_i686_msvc", "windows_i686_msvc 0.42.2",
"windows_x86_64_gnu", "windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm", "windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc", "windows_x86_64_msvc 0.42.2",
] ]
[[package]] [[package]]
@ -1699,7 +1699,16 @@ version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [ dependencies = [
"windows-targets", "windows-targets 0.42.2",
]
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets 0.48.0",
] ]
[[package]] [[package]]
@ -1708,13 +1717,28 @@ version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [ dependencies = [
"windows_aarch64_gnullvm", "windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc", "windows_aarch64_msvc 0.42.2",
"windows_i686_gnu", "windows_i686_gnu 0.42.2",
"windows_i686_msvc", "windows_i686_msvc 0.42.2",
"windows_x86_64_gnu", "windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm", "windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc", "windows_x86_64_msvc 0.42.2",
]
[[package]]
name = "windows-targets"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
dependencies = [
"windows_aarch64_gnullvm 0.48.0",
"windows_aarch64_msvc 0.48.0",
"windows_i686_gnu 0.48.0",
"windows_i686_msvc 0.48.0",
"windows_x86_64_gnu 0.48.0",
"windows_x86_64_gnullvm 0.48.0",
"windows_x86_64_msvc 0.48.0",
] ]
[[package]] [[package]]
@ -1723,42 +1747,84 @@ version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
[[package]] [[package]]
name = "windows_aarch64_msvc" name = "windows_aarch64_msvc"
version = "0.42.2" version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
[[package]] [[package]]
name = "windows_i686_gnu" name = "windows_i686_gnu"
version = "0.42.2" version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]]
name = "windows_i686_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
[[package]] [[package]]
name = "windows_i686_msvc" name = "windows_i686_msvc"
version = "0.42.2" version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]]
name = "windows_i686_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
[[package]] [[package]]
name = "windows_x86_64_gnu" name = "windows_x86_64_gnu"
version = "0.42.2" version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
[[package]] [[package]]
name = "windows_x86_64_gnullvm" name = "windows_x86_64_gnullvm"
version = "0.42.2" version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
[[package]] [[package]]
name = "windows_x86_64_msvc" name = "windows_x86_64_msvc"
version = "0.42.2" version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]] [[package]]
name = "winreg" name = "winreg"
version = "0.10.1" version = "0.10.1"

View File

@ -1,7 +1,7 @@
use crate::api::apiserver;
use crate::config::{Context, GIT_VERSION}; use crate::config::{Context, GIT_VERSION};
use crate::fw::{fwblock, fwinit}; use crate::fw::{fwblock, fwinit};
use crate::ip::{filter, IpData, IpEvent}; use crate::ip::{filter, IpData, IpEvent};
use crate::monitoring::apiserver;
use crate::utils::{gethostname, read_lines, sleep_ms}; use crate::utils::{gethostname, read_lines, sleep_ms};
use crate::webservice::send_to_ipbl_api; use crate::webservice::send_to_ipbl_api;
use crate::websocket::{send_to_ipbl_websocket, websocketinit}; use crate::websocket::{send_to_ipbl_websocket, websocketinit};
@ -17,7 +17,7 @@ use tokio::sync::RwLock;
pub const PKG_NAME: &str = env!("CARGO_PKG_NAME"); pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
const BL_CHAN_SIZE: usize = 32; const BL_CHAN_SIZE: usize = 32;
const ZMQ_CHAN_SIZE: usize = 64; const WS_CHAN_SIZE: usize = 64;
pub async fn run() { pub async fn run() {
let fqdn = gethostname(true); let fqdn = gethostname(true);
@ -32,11 +32,13 @@ pub async fn run() {
println!("Launching {}, version {}", PKG_NAME, pkgversion); println!("Launching {}, version {}", PKG_NAME, pkgversion);
fwinit(); fwinit();
apiserver(&ctxarc).await.unwrap(); let ctxapi = Arc::clone(&ctxarc);
apiserver(&ctxapi).await.unwrap();
// initialize sockets // initialize sockets
let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(ZMQ_CHAN_SIZE); let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(WS_CHAN_SIZE);
let wssocket = websocketinit(&ctxarc, &ipeventtx).await; let ctxws = Arc::clone(&ctxarc);
let wssocketrr = websocketinit(&ctxws, &ipeventtx).await;
let mut blrx = watchfiles(&ctxarc).await; let mut blrx = watchfiles(&ctxarc).await;
@ -47,19 +49,15 @@ pub async fn run() {
let bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone(); let bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone();
send_to_ipbl_websocket(&wssocket, &bootstrap_event, &mut ret).await; send_to_ipbl_websocket(&wssocketrr, &bootstrap_event, &mut ret).await;
loop { loop {
ret = Vec::new(); ret = Vec::new();
// wait for logs parse and zmq channel receive
let ipdata_wait = ipeventrx.recv();
let force_wait = sleep_ms(200);
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
tokio::select! { tokio::select! {
val = ipdata_wait => { val = ipeventrx.recv() => {
let received_ip = val.unwrap(); let received_ip = val.unwrap();
let mut ctx = ctxclone.write().await; let mut ctx = ctxclone.write().await;
@ -68,11 +66,11 @@ pub async fn run() {
for ip_to_send in ctx.get_blocklist_toblock().await { for ip_to_send in ctx.get_blocklist_toblock().await {
let ipe = IpEvent{ let ipe = IpEvent{
msgtype: String::from("init"), msgtype: String::from("init"),
mode: String::from("zmq"), mode: String::from("ws"),
hostname: fqdn.clone(), hostname: fqdn.clone(),
ipdata: ip_to_send, ipdata: ip_to_send,
}; };
send_to_ipbl_websocket(&wssocket, &ipe, &mut ret).await; send_to_ipbl_websocket(&wssocketrr, &ipe, &mut ret).await;
} }
continue continue
} }
@ -90,12 +88,15 @@ pub async fn run() {
hostname: fqdn.clone(), hostname: fqdn.clone(),
ipdata: ipevent.ipdata, ipdata: ipevent.ipdata,
}; };
send_to_ipbl_api(&ctx, &event, &mut ret).await; println!("blabla1");
send_to_ipbl_websocket(&wssocket, &event, &mut ret).await; send_to_ipbl_api(&ctx.client, &ctx.hostname, &ctx.flags.server, &event, &mut ret).await;
println!("blabla2");
send_to_ipbl_websocket(&wssocketrr, &event, &mut ret).await;
println!("blabla3");
} }
} }
} }
_val = force_wait => {} _val = sleep_ms(200) => {}
}; };
let toblock; let toblock;

View File

@ -1,8 +1,8 @@
mod api;
mod config; mod config;
mod fw; mod fw;
mod ip; mod ip;
mod ipblc; mod ipblc;
mod monitoring;
mod utils; mod utils;
mod webservice; mod webservice;
mod websocket; mod websocket;

View File

@ -8,10 +8,12 @@ use tokio::net::TcpSocket;
use tokio::sync::RwLock; use tokio::sync::RwLock;
pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> { pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let ctxclone = Arc::clone(ctxarc); let ctxarc = ctxarc.clone();
let ctx = ctxclone.read().await; let addr;
let addr = ctx.cfg.api.parse().unwrap(); {
drop(ctx); let ctx = ctxarc.read().await;
addr = ctx.cfg.api.parse().unwrap();
}
let socket = TcpSocket::new_v4().unwrap(); let socket = TcpSocket::new_v4().unwrap();
socket.bind(addr).unwrap(); socket.bind(addr).unwrap();
@ -26,7 +28,7 @@ pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
//let mut buf = [0; 1024]; //let mut buf = [0; 1024];
let data; let data;
{ {
let ctx = ctxclone.read().await; let ctx = ctxarc.read().await;
data = serde_json::to_string(&ctx.blocklist); data = serde_json::to_string(&ctx.blocklist);
} }

View File

@ -5,7 +5,7 @@ use crate::utils::sleep_s;
use reqwest::Client; use reqwest::Client;
use reqwest::Error as ReqError; use reqwest::Error as ReqError;
pub async fn send_to_ipbl_ws( pub async fn send_to_ipbl_api(
client: &Client, client: &Client,
hostname: &str, hostname: &str,
server: &str, server: &str,
@ -13,6 +13,7 @@ pub async fn send_to_ipbl_ws(
ret: &mut Vec<String>, ret: &mut Vec<String>,
) { ) {
ret.push(format!("host: {hostname}", hostname = hostname)); ret.push(format!("host: {hostname}", hostname = hostname));
let mut i = 1;
loop { loop {
match push_ip(&client, &server, &ip.ipdata, ret).await { match push_ip(&client, &server, &ip.ipdata, ret).await {
Ok(_) => { Ok(_) => {
@ -21,6 +22,10 @@ pub async fn send_to_ipbl_ws(
Err(err) => { Err(err) => {
println!("{err}"); println!("{err}");
sleep_s(1).await; sleep_s(1).await;
if i == 10 {
break;
}
i += 1;
} }
}; };
} }

View File

@ -1,7 +1,8 @@
use crate::config::{Context, WebSocketCfg}; use crate::config::{Context, WebSocketCfg};
use crate::ip::IpEvent; use crate::ip::IpEvent;
use crate::utils::gethostname; use crate::utils::{gethostname, sleep_ms};
use serde_json::json;
use std::net::TcpStream; use std::net::TcpStream;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
@ -11,55 +12,59 @@ use tungstenite::*;
pub async fn websocketconnect<'a>( pub async fn websocketconnect<'a>(
wscfg: &WebSocketCfg, wscfg: &WebSocketCfg,
hostname: String,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> { ) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> {
let (socket, _response) = connect(&wscfg.endpoint).expect("Can't connect"); let (mut socket, _response) = connect(&wscfg.endpoint).expect("Can't connect");
let msg = json!({ "hostname": hostname });
socket
.write_message(Message::Text(msg.to_string()))
.unwrap();
Ok(socket) Ok(socket)
} }
pub async fn websocketinit( pub async fn websocketinit(
ctx: &Arc<RwLock<Context>>, ctxarc: &Arc<RwLock<Context>>,
ipeventtx: &Sender<IpEvent>, ipeventtx: &Sender<IpEvent>,
) -> Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>> { ) -> Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>> {
let ctxarc = Arc::clone(&ctx); let (wssocketps, wssocketreqrep);
let wssocket;
let wssocketcb;
{ {
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
wssocket = Arc::new(RwLock::new( wssocketps = Arc::new(RwLock::new(
websocketconnect(&ctx.cfg.ws.get("websocket").unwrap()) websocketconnect(&ctx.cfg.ws.get("pubsub").unwrap(), ctx.hostname.clone())
.await
.unwrap(),
));
wssocketreqrep = Arc::new(RwLock::new(
websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone())
.await .await
.unwrap(), .unwrap(),
)); ));
wssocketcb = wssocket.clone();
} }
wslistenpubsub(wssocket, ipeventtx.clone()).await; wslistenpubsub(wssocketps, ipeventtx.clone()).await;
return wssocketcb; return wssocketreqrep;
} }
async fn wslistenpubsub( async fn wslistenpubsub(
socket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>, websocket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
txpubsub: Sender<IpEvent>, txpubsub: Sender<IpEvent>,
) { ) {
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
let msgs: Option<String>; let msgs: Option<String>;
{ {
let mut socket = socket.write().await; let mut ws = websocket.write().await;
msgs = match socket.read_message() { msgs = match ws.read_message() {
Ok(s) => { Ok(s) => {
println!("msg: {}", s); println!("msg: {}", s);
None None
} }
Err(e) => { Err(e) => {
println!("error: {e:?}"); println!("error: {e:?}");
socket.close(None).unwrap(); ws.close(None).unwrap();
return; return;
} }
}; };
}
match msgs { match msgs {
Some(ss) => { Some(ss) => {
let tosend: IpEvent = serde_json::from_str(ss.as_str()).unwrap(); let tosend: IpEvent = serde_json::from_str(ss.as_str()).unwrap();
@ -72,6 +77,43 @@ async fn wslistenpubsub(
None => {} None => {}
}; };
} }
}
});
}
async fn wslistenreqrep(
websocket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
txpubsub: Sender<IpEvent>,
) {
tokio::spawn(async move {
loop {
let msgs: Option<String>;
{
let mut ws = websocket.write().await;
msgs = match ws.read_message() {
Ok(s) => {
println!("msg: {}", s);
None
}
Err(e) => {
println!("error: {e:?}");
ws.close(None).unwrap();
return;
}
};
match msgs {
Some(ss) => {
let tosend: IpEvent = serde_json::from_str(ss.as_str()).unwrap();
if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
}
}); });
} }
@ -82,21 +124,28 @@ pub async fn send_to_ipbl_websocket(
) { ) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
println!("testarc");
let wsclone = Arc::clone(wssocket); let wsclone = Arc::clone(wssocket);
println!("testarc2");
let mut ws = wsclone.write().await; let mut ws = wsclone.write().await;
println!("write");
match ws.write_message(Message::Text(msg)) { match ws.write_message(Message::Text(msg)) {
Ok(_) => {} Ok(o) => {
println!("{o:?}")
}
Err(e) => { Err(e) => {
println!("{e:?}") println!("err 1: {e:?}")
} }
}; };
println!("read");
match ws.read_message() { match ws.read_message() {
Ok(o) => { Ok(o) => {
println!("{o}") println!("{o}")
} }
Err(e) => { Err(e) => {
println!("{e:?}") println!("err 2: {e:?}")
} }
}; };
} }