ipblc/src/websocket.rs
Paul Lecuq f29ccd3f0b
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
updated ipevent with Option<IpData>
2023-12-26 10:42:39 +01:00

133 lines
3.9 KiB
Rust

use crate::config::{Context, WebSocketCfg};
use crate::ip::IpEvent;
use crate::utils::{gethostname, sleep_s};
use serde_json::json;
use std::io::{self, Write};
use std::net::TcpStream;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tungstenite::stream::*;
use tungstenite::*;
pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event, wscfg);
{
let ctx = ctxarc.read().await;
bootstrap_event = ctx.cfg.bootstrap_event().clone();
wscfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
}
wssocketrr = websocketconnect(&wscfg, &gethostname(true)).await.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
return wssocketrr;
}
pub async fn websocketpubsub(
ctxarc: &Arc<RwLock<Context>>,
txpubsub: Arc<RwLock<Sender<IpEvent>>>,
) {
let cfg;
{
let ctx = ctxarc.read().await;
cfg = ctx.cfg.ws.get("pubsub").unwrap().clone();
}
let mut websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, &gethostname(true)).await.unwrap(),
));
tokio::spawn(async move {
loop {
let mut ws = websocket.write().await;
match ws.read() {
Ok(msg) => {
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
Ok(o) => o,
Err(e) => {
println!("error in pubsub: {e:?}");
continue;
}
};
if tosend.ipdata.clone().unwrap().hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
let txps = txpubsub.read().await;
txps.send(tosend).await.unwrap();
}
}
Err(e) => {
println!("error in pubsub: {e:?}");
ws.close(None).unwrap();
drop(ws);
websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, &gethostname(true)).await.unwrap(),
));
}
};
}
});
}
pub async fn websocketconnect<'a>(
wscfg: &WebSocketCfg,
hostname: &String,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> {
let endpoint = &wscfg.endpoint;
print!("connecting to {} ... ", endpoint);
io::stdout().flush().unwrap();
let mut socket;
loop {
(socket, _) = match connect(endpoint) {
Ok((o, e)) => (o, e),
_ => {
println!("error connecting to {endpoint}, retrying");
sleep_s(1).await;
continue;
}
};
break;
}
println!("connected to {endpoint}");
let msg = json!({ "hostname": hostname });
socket.send(Message::Text(msg.to_string())).unwrap();
Ok(socket)
}
pub async fn send_to_ipbl_websocket(
ws: &mut WebSocket<MaybeTlsStream<TcpStream>>,
ip: &IpEvent,
) -> bool {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
if ws.can_write() {
match ws.send(Message::Text(msg)) {
Ok(_) => {}
Err(e) => {
println!("err send read: {e:?}");
ws.close(None).unwrap_or(());
return false;
}
};
} else {
ws.close(None).unwrap_or(());
return false;
};
if ws.can_read() {
match ws.read() {
Ok(_) => {}
Err(e) => {
println!("err send read: {e:?}");
ws.close(None).unwrap_or(());
return false;
}
};
} else {
ws.close(None).unwrap_or(());
return false;
};
true
}