updated error handling

This commit is contained in:
Paul 2023-04-10 16:00:57 +02:00
parent 5b47d9d257
commit 301775c91d

View File

@ -10,6 +10,58 @@ use tokio::sync::RwLock;
use tungstenite::stream::*; use tungstenite::stream::*;
use tungstenite::*; use tungstenite::*;
pub async fn websocketinit(
ctxarc: &Arc<RwLock<Context>>,
ipeventtx: Arc<RwLock<Sender<IpEvent>>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event);
{
let ctx = ctxarc.read().await;
bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone();
wssocketrr = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone())
.await
.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
}
wslistenpubsub(ctxarc, ipeventtx).await;
return wssocketrr;
}
async fn wslistenpubsub(ctxarc: &Arc<RwLock<Context>>, txpubsub: Arc<RwLock<Sender<IpEvent>>>) {
let ctx = ctxarc.read().await;
let cfg = ctx.cfg.ws.get("pubsub").unwrap().clone();
let hostname = ctx.hostname.clone();
let mut websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, hostname.clone()).await.unwrap(),
));
tokio::spawn(async move {
loop {
let mut ws = websocket.write().await;
match ws.read_message() {
Ok(msg) => {
let tosend: IpEvent = serde_json::from_str(msg.to_string().as_str()).unwrap();
if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
let txps = txpubsub.write().await;
txps.send(tosend).await.unwrap();
}
}
Err(e) => {
println!("error in pubsub: {e:?}");
ws.close(None).unwrap();
drop(ws);
websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, hostname.clone()).await.unwrap(),
));
}
};
}
});
}
pub async fn websocketconnect<'a>( pub async fn websocketconnect<'a>(
wscfg: &WebSocketCfg, wscfg: &WebSocketCfg,
hostname: String, hostname: String,
@ -22,59 +74,6 @@ pub async fn websocketconnect<'a>(
Ok(socket) Ok(socket)
} }
pub async fn websocketinit(
ctxarc: &Arc<RwLock<Context>>,
ipeventtx: Arc<RwLock<Sender<IpEvent>>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (wssocketps, mut wssocketrr, bootstrap_event);
{
let ctx = ctxarc.read().await;
bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone();
wssocketps = Arc::new(RwLock::new(
websocketconnect(&ctx.cfg.ws.get("pubsub").unwrap(), ctx.hostname.clone())
.await
.unwrap(),
));
wssocketrr = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone())
.await
.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
}
wslistenpubsub(wssocketps, ipeventtx).await;
return wssocketrr;
}
async fn wslistenpubsub(
websocket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
txpubsub: Arc<RwLock<Sender<IpEvent>>>,
) {
tokio::spawn(async move {
loop {
{
let mut ws = websocket.write().await;
match ws.read_message() {
Ok(msg) => {
let tosend: IpEvent =
serde_json::from_str(msg.to_string().as_str()).unwrap();
if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
let txps = txpubsub.write().await;
txps.send(tosend).await.unwrap();
}
}
Err(e) => {
println!("error: {e:?}");
ws.close(None).unwrap();
return;
}
};
}
}
});
}
pub async fn send_to_ipbl_websocket( pub async fn send_to_ipbl_websocket(
ws: &mut WebSocket<MaybeTlsStream<TcpStream>>, ws: &mut WebSocket<MaybeTlsStream<TcpStream>>,
ip: &IpEvent, ip: &IpEvent,
@ -84,7 +83,8 @@ pub async fn send_to_ipbl_websocket(
match ws.write_message(Message::Text(msg)) { match ws.write_message(Message::Text(msg)) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err 1: {e:?}"); println!("err send write: {e:?}");
ws.close(None).unwrap();
return false; return false;
} }
}; };
@ -92,7 +92,8 @@ pub async fn send_to_ipbl_websocket(
match ws.read_message() { match ws.read_message() {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err 2: {e:?}"); println!("err send read: {e:?}");
ws.close(None).unwrap();
return false; return false;
} }
}; };