updated websocket branch
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Paul 2023-03-12 14:27:05 +01:00
parent 831dcdace5
commit 36f892cf42
3 changed files with 48 additions and 59 deletions

View File

@ -56,7 +56,7 @@ See [here](NOTES.md)
## License ## License
```text ```text
Copyright (c) 2021, 2022 PaulBSD Copyright (c) 2021, 2022, 2023 PaulBSD
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without Redistribution and use in source and binary forms, with or without

View File

@ -18,7 +18,6 @@ use tokio::sync::RwLock;
pub const PKG_NAME: &str = env!("CARGO_PKG_NAME"); pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
const BL_CHAN_SIZE: usize = 32; const BL_CHAN_SIZE: usize = 32;
const ZMQ_CHAN_SIZE: usize = 64; const ZMQ_CHAN_SIZE: usize = 64;
const API_CHAN_SIZE: usize = 64;
pub async fn run() { pub async fn run() {
let fqdn = gethostname(true); let fqdn = gethostname(true);
@ -33,33 +32,28 @@ pub async fn run() {
println!("Launching {}, version {}", PKG_NAME, pkgversion); println!("Launching {}, version {}", PKG_NAME, pkgversion);
fwinit(); fwinit();
let (_apitx, mut apirx): (Sender<String>, Receiver<String>) = channel(API_CHAN_SIZE); apiserver(&ctxarc).await.unwrap();
let ctxclone = Arc::clone(&ctxarc);
apiserver(&ctxclone).await.unwrap();
// initialize sockets // initialize sockets
let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(ZMQ_CHAN_SIZE); let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(ZMQ_CHAN_SIZE);
let (wspubsubsocket, mut wsreqsocket) = websocketinit(&ctxclone, &ipeventtx).await; let wssocket = websocketinit(&ctxarc, &ipeventtx).await;
let mut blrx = watchfiles(&ctxclone).await; let mut blrx = watchfiles(&ctxarc).await;
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
tokio::spawn(async move { tokio::spawn(async move {
compare_files_changes(&ctxclone, &mut blrx, &ipeventtx).await; compare_files_changes(&ctxclone, &mut blrx, &ipeventtx).await;
}); });
let ctxclone = Arc::clone(&ctxarc); let bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone();
let bootstrap_event = ctxclone.read().await.cfg.bootstrap_event().clone();
send_to_ipbl_websocket(&mut wsreqsocket, &bootstrap_event, &mut ret).await; send_to_ipbl_websocket(&wssocket, &bootstrap_event, &mut ret).await;
loop { loop {
ret = Vec::new(); ret = Vec::new();
// wait for logs parse and zmq channel receive // wait for logs parse and zmq channel receive
//let mut received_ip = ipdatarx.recv();
let ipdata_wait = ipeventrx.recv(); let ipdata_wait = ipeventrx.recv();
let apimsg_wait = apirx.recv();
let force_wait = sleep_ms(200); let force_wait = sleep_ms(200);
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
@ -78,7 +72,7 @@ pub async fn run() {
hostname: fqdn.clone(), hostname: fqdn.clone(),
ipdata: ip_to_send, ipdata: ip_to_send,
}; };
send_to_ipbl_websocket(&mut wsreqsocket, &ipe, &mut ret).await; send_to_ipbl_websocket(&wssocket, &ipe, &mut ret).await;
} }
continue continue
} }
@ -97,11 +91,10 @@ pub async fn run() {
ipdata: ipevent.ipdata, ipdata: ipevent.ipdata,
}; };
send_to_ipbl_api(&ctx, &event, &mut ret).await; send_to_ipbl_api(&ctx, &event, &mut ret).await;
send_to_ipbl_websocket(&mut wsreqsocket, &event, &mut ret).await; send_to_ipbl_websocket(&wssocket, &event, &mut ret).await;
} }
} }
} }
_val = apimsg_wait => {}
_val = force_wait => {} _val = force_wait => {}
}; };
@ -110,7 +103,7 @@ pub async fn run() {
let mut ctx = ctxarc.write().await; let mut ctx = ctxarc.write().await;
ctx.gc_blocklist().await; ctx.gc_blocklist().await;
toblock = ctx.get_blocklist_toblock().await; toblock = ctx.get_blocklist_toblock().await;
}
// apply firewall blocking // apply firewall blocking
match fwblock(&toblock, &mut ret, &mut fwlen) { match fwblock(&toblock, &mut ret, &mut fwlen) {
Ok(_) => {} Ok(_) => {}
@ -118,13 +111,13 @@ pub async fn run() {
println!("Err: {err}, unable to push firewall rules, use super user") println!("Err: {err}, unable to push firewall rules, use super user")
} }
}; };
}
// log lines // log lines
if ret.len() > 0 { if ret.len() > 0 {
println!("{ret}", ret = ret.join(", ")); println!("{ret}", ret = ret.join(", "));
} }
{
let now_cfg_reload = Local::now().trunc_subsecs(0); let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - last_cfg_reload) > Duration::seconds(5) { if (now_cfg_reload - last_cfg_reload) > Duration::seconds(5) {
// reload configuration from the server // reload configuration from the server
@ -140,7 +133,6 @@ pub async fn run() {
}; };
} }
} }
}
async fn watchfiles(ctxarc: &Arc<RwLock<Context>>) -> Receiver<FileEvent> { async fn watchfiles(ctxarc: &Arc<RwLock<Context>>) -> Receiver<FileEvent> {
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE); let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);

View File

@ -1,49 +1,42 @@
use crate::config::{Context, WebSocketCfg}; use crate::config::{Context, WebSocketCfg};
use crate::ip::IpEvent; use crate::ip::IpEvent;
use crate::utils::gethostname; use crate::utils::gethostname;
use std::net::TcpStream;
use tungstenite::stream::*;
use tungstenite::*;
use std::net::TcpStream;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tungstenite::stream::*;
use tungstenite::*;
pub async fn websocketconnect<'a>( pub async fn websocketconnect<'a>(
wscfg: &WebSocketCfg, wscfg: &WebSocketCfg,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> { ) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> {
let (socket, response) = connect(&wscfg.endpoint).expect("Can't connect"); let (socket, _response) = connect(&wscfg.endpoint).expect("Can't connect");
Ok(socket) Ok(socket)
} }
pub async fn websocketinit( pub async fn websocketinit(
ctx: &Arc<RwLock<Context>>, ctx: &Arc<RwLock<Context>>,
ipeventtx: &Sender<IpEvent>, ipeventtx: &Sender<IpEvent>,
) -> ( ) -> Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>> {
Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
WebSocket<MaybeTlsStream<TcpStream>>,
) {
let ctxarc = Arc::clone(&ctx); let ctxarc = Arc::clone(&ctx);
let wsreqsocket; let wssocket;
let wssubsocket; let wssocketcb;
let wssubsocketcb;
{ {
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
wsreqsocket = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap()) wssocket = Arc::new(RwLock::new(
.await websocketconnect(&ctx.cfg.ws.get("websocket").unwrap())
.unwrap();
wssubsocket = Arc::new(RwLock::new(
websocketconnect(&ctx.cfg.ws.get("pubsub").unwrap())
.await .await
.unwrap(), .unwrap(),
)); ));
wssubsocketcb = wssubsocket.clone(); wssocketcb = wssocket.clone();
} }
wslistenpubsub(wssubsocket, ipeventtx.clone()).await; wslistenpubsub(wssocket, ipeventtx.clone()).await;
return (wssubsocketcb, wsreqsocket); return wssocketcb;
} }
async fn wslistenpubsub( async fn wslistenpubsub(
@ -83,18 +76,22 @@ async fn wslistenpubsub(
} }
pub async fn send_to_ipbl_websocket( pub async fn send_to_ipbl_websocket(
reqsocket: &mut WebSocket<MaybeTlsStream<TcpStream>>, wssocket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
ip: &IpEvent, ip: &IpEvent,
_ret: &mut Vec<String>, _ret: &mut Vec<String>,
) { ) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match reqsocket.write_message(Message::Text(msg)) {
let wsclone = Arc::clone(wssocket);
let mut ws = wsclone.write().await;
match ws.write_message(Message::Text(msg)) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("{e:?}") println!("{e:?}")
} }
}; };
match reqsocket.read_message() { match ws.read_message() {
Ok(o) => { Ok(o) => {
println!("{o}") println!("{o}")
} }