updated ipblc websocket feat

This commit is contained in:
Paul 2023-04-10 11:31:16 +02:00
parent 50b9c7f7b2
commit 9c5cc95d4b
3 changed files with 39 additions and 53 deletions

View File

@ -20,10 +20,8 @@ const BL_CHAN_SIZE: usize = 32;
const WS_CHAN_SIZE: usize = 64; const WS_CHAN_SIZE: usize = 64;
pub async fn run() { pub async fn run() {
let fqdn = gethostname(true);
let globalctx = Context::new().await; let globalctx = Context::new().await;
let ctxarc = Arc::new(RwLock::new(globalctx)); let ctxarc = Arc::new(RwLock::new(globalctx));
let mut ret: Vec<String> = Vec::new();
let mut fwlen: usize = 0; let mut fwlen: usize = 0;
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
@ -37,22 +35,22 @@ pub async fn run() {
// initialize sockets // initialize sockets
let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(WS_CHAN_SIZE); let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(WS_CHAN_SIZE);
let ipeventtxarc = Arc::new(RwLock::new(ipeventtx));
let ctxws = Arc::clone(&ctxarc); let ctxws = Arc::clone(&ctxarc);
let mut wssocketrr = websocketinit(&ctxws, &ipeventtx).await; let mut ipeventws = Arc::clone(&ipeventtxarc);
let mut wssocketrr = websocketinit(&ctxws, ipeventws).await;
let mut blrx = watchfiles(&ctxarc).await; let mut blrx = watchfiles(&ctxarc).await;
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
let ipeventclone = Arc::clone(&ipeventtxarc);
tokio::spawn(async move { tokio::spawn(async move {
compare_files_changes(&ctxclone, &mut blrx, &ipeventtx).await; compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await;
}); });
let bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event, &mut ret).await;
loop { loop {
ret = Vec::new(); let mut ret: Vec<String> = Vec::new();
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
@ -67,10 +65,10 @@ pub async fn run() {
let ipe = IpEvent{ let ipe = IpEvent{
msgtype: String::from("init"), msgtype: String::from("init"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: fqdn.clone(), hostname: gethostname(true),
ipdata: ip_to_send, ipdata: ip_to_send,
}; };
send_to_ipbl_websocket(&mut wssocketrr, &ipe, &mut ret).await; send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
} }
continue continue
} }
@ -82,14 +80,19 @@ pub async fn run() {
if let Some(ipevent) = filtered_ipevent { if let Some(ipevent) = filtered_ipevent {
if received_ip.msgtype != "init" { if received_ip.msgtype != "init" {
println!("sending {} to api and ws", ipevent.ipdata.ip); println!("sending {} to api and ws", ipevent.ipdata.ip);
let event = IpEvent{ let ipe = IpEvent{
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: fqdn.clone(), hostname: gethostname(true),
ipdata: ipevent.ipdata, ipdata: ipevent.ipdata,
}; };
send_to_ipbl_api(&ctx.client, &ctx.hostname, &ctx.flags.server, &event, &mut ret).await; send_to_ipbl_api(&ctx.client, &ctx.flags.server, &ipe).await;
send_to_ipbl_websocket(&mut wssocketrr, &event, &mut ret).await; let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
if !status {
ipeventws = Arc::clone(&ipeventtxarc);
wssocketrr = websocketinit(&ctxws, ipeventws).await;
}
} }
} }
} }
@ -170,7 +173,7 @@ async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, b
async fn compare_files_changes( async fn compare_files_changes(
ctxarc: &Arc<RwLock<Context>>, ctxarc: &Arc<RwLock<Context>>,
inrx: &mut Receiver<FileEvent>, inrx: &mut Receiver<FileEvent>,
ipeventtx: &Sender<IpEvent>, ipeventtx: &Arc<RwLock<Sender<IpEvent>>>,
) { ) {
let mut tnets; let mut tnets;
loop { loop {
@ -236,7 +239,8 @@ async fn compare_files_changes(
mode: String::from("file"), mode: String::from("file"),
ipdata: ip, ipdata: ip,
}; };
ipeventtx.send(ipevent).await.unwrap(); let ipetx = ipeventtx.write().await;
ipetx.send(ipevent).await.unwrap();
} }
} }
None => {} None => {}

View File

@ -5,17 +5,10 @@ use crate::utils::sleep_s;
use reqwest::Client; use reqwest::Client;
use reqwest::Error as ReqError; use reqwest::Error as ReqError;
pub async fn send_to_ipbl_api( pub async fn send_to_ipbl_api(client: &Client, server: &str, ip: &IpEvent) {
client: &Client,
hostname: &str,
server: &str,
ip: &IpEvent,
ret: &mut Vec<String>,
) {
ret.push(format!("host: {hostname}", hostname = hostname));
let mut i = 1; let mut i = 1;
loop { loop {
match push_ip(&client, &server, &ip.ipdata, ret).await { match push_ip(&client, &server, &ip.ipdata).await {
Ok(_) => { Ok(_) => {
break; break;
} }
@ -31,13 +24,7 @@ pub async fn send_to_ipbl_api(
} }
} }
async fn push_ip( async fn push_ip(client: &Client, server: &str, ip: &IpData) -> Result<(), ReqError> {
client: &Client,
server: &str,
ip: &IpData,
ret: &mut Vec<String>,
) -> Result<(), ReqError> {
let result: String;
let mut data: Vec<IpData> = vec![]; let mut data: Vec<IpData> = vec![];
data.push(IpData { data.push(IpData {
@ -47,22 +34,12 @@ async fn push_ip(
hostname: ip.hostname.to_string(), hostname: ip.hostname.to_string(),
}); });
let resp = client client
.post(format!("{server}/ips")) .post(format!("{server}/ips"))
.json(&data) .json(&data)
.send() .send()
.await?; .await?;
ret.push(format!("status: {status}", status = resp.status()));
let res = resp.text().await.unwrap();
if res.trim().len() > 0 {
result = res.trim().to_string();
} else {
result = "".to_string();
}
ret.push(format!("response: {result}"));
Ok(()) Ok(())
} }

View File

@ -24,11 +24,12 @@ pub async fn websocketconnect<'a>(
pub async fn websocketinit( pub async fn websocketinit(
ctxarc: &Arc<RwLock<Context>>, ctxarc: &Arc<RwLock<Context>>,
ipeventtx: &Sender<IpEvent>, ipeventtx: Arc<RwLock<Sender<IpEvent>>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> { ) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (wssocketps, wssocketrr); let (wssocketps, mut wssocketrr, bootstrap_event);
{ {
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone();
wssocketps = Arc::new(RwLock::new( wssocketps = Arc::new(RwLock::new(
websocketconnect(&ctx.cfg.ws.get("pubsub").unwrap(), ctx.hostname.clone()) websocketconnect(&ctx.cfg.ws.get("pubsub").unwrap(), ctx.hostname.clone())
.await .await
@ -37,15 +38,16 @@ pub async fn websocketinit(
wssocketrr = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone()) wssocketrr = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone())
.await .await
.unwrap(); .unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
} }
wslistenpubsub(wssocketps, ipeventtx.clone()).await; wslistenpubsub(wssocketps, ipeventtx).await;
return wssocketrr; return wssocketrr;
} }
async fn wslistenpubsub( async fn wslistenpubsub(
websocket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>, websocket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
txpubsub: Sender<IpEvent>, txpubsub: Arc<RwLock<Sender<IpEvent>>>,
) { ) {
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
@ -58,7 +60,8 @@ async fn wslistenpubsub(
if tosend.ipdata.hostname != gethostname(true) if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string() || tosend.msgtype == "init".to_string()
{ {
txpubsub.send(tosend).await.unwrap(); let txps = txpubsub.write().await;
txps.send(tosend).await.unwrap();
} }
} }
Err(e) => { Err(e) => {
@ -75,21 +78,23 @@ async fn wslistenpubsub(
pub async fn send_to_ipbl_websocket( pub async fn send_to_ipbl_websocket(
ws: &mut WebSocket<MaybeTlsStream<TcpStream>>, ws: &mut WebSocket<MaybeTlsStream<TcpStream>>,
ip: &IpEvent, ip: &IpEvent,
_ret: &mut Vec<String>, ) -> bool {
) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match ws.write_message(Message::Text(msg)) { match ws.write_message(Message::Text(msg)) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err 1: {e:?}") println!("err 1: {e:?}");
return false;
} }
}; };
match ws.read_message() { match ws.read_message() {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err 2: {e:?}") println!("err 2: {e:?}");
return false;
} }
}; };
true
} }