132 lines
3.8 KiB
Rust
132 lines
3.8 KiB
Rust
use crate::config::{Context, WebSocketCfg};
|
|
use crate::ip::IpEvent;
|
|
use crate::utils::{gethostname, sleep_s};
|
|
|
|
use serde_json::json;
|
|
use std::io::{self, Write};
|
|
use std::net::TcpStream;
|
|
use std::sync::Arc;
|
|
use tokio::sync::mpsc::Sender;
|
|
use tokio::sync::RwLock;
|
|
use tungstenite::stream::*;
|
|
use tungstenite::*;
|
|
|
|
pub async fn websocketreqrep(
|
|
ctxarc: &Arc<RwLock<Context>>,
|
|
) -> WebSocket<MaybeTlsStream<TcpStream>> {
|
|
let (mut wssocketrr, bootstrap_event, cfg);
|
|
{
|
|
let ctx = ctxarc.read().await;
|
|
bootstrap_event = ctx.cfg.bootstrap_event().clone();
|
|
cfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
|
|
}
|
|
wssocketrr = websocketconnect(&cfg, &gethostname(true)).await.unwrap();
|
|
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
|
|
|
|
return wssocketrr;
|
|
}
|
|
|
|
pub async fn websocketpubsub(
|
|
ctxarc: &Arc<RwLock<Context>>,
|
|
txpubsub: Arc<RwLock<Sender<IpEvent>>>,
|
|
) {
|
|
let cfg;
|
|
{
|
|
let ctx = ctxarc.read().await;
|
|
cfg = ctx.cfg.ws.get("pubsub").unwrap().clone();
|
|
}
|
|
let mut websocket = Arc::new(RwLock::new(
|
|
websocketconnect(&cfg, &gethostname(true)).await.unwrap(),
|
|
));
|
|
tokio::spawn(async move {
|
|
loop {
|
|
let mut ws = websocket.write().await;
|
|
match ws.read() {
|
|
Ok(msg) => {
|
|
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
|
|
Ok(o) => o,
|
|
Err(_e) => {
|
|
continue;
|
|
}
|
|
};
|
|
if tosend.ipdata.hostname != gethostname(true)
|
|
|| tosend.msgtype == "init".to_string()
|
|
{
|
|
let txps = txpubsub.read().await;
|
|
txps.send(tosend).await.unwrap();
|
|
}
|
|
}
|
|
Err(e) => {
|
|
println!("error in pubsub: {e:?}");
|
|
ws.close(None).unwrap();
|
|
drop(ws);
|
|
websocket = Arc::new(RwLock::new(
|
|
websocketconnect(&cfg, &gethostname(true)).await.unwrap(),
|
|
));
|
|
}
|
|
};
|
|
}
|
|
});
|
|
}
|
|
|
|
pub async fn websocketconnect<'a>(
|
|
wscfg: &WebSocketCfg,
|
|
hostname: &String,
|
|
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> {
|
|
let endpoint = &wscfg.endpoint;
|
|
print!("connecting to {} ... ", endpoint);
|
|
io::stdout().flush().unwrap();
|
|
let mut socket;
|
|
loop {
|
|
(socket, _) = match connect(endpoint) {
|
|
Ok((o, e)) => (o, e),
|
|
_ => {
|
|
println!("error connecting to {endpoint}, retrying");
|
|
sleep_s(1).await;
|
|
continue;
|
|
}
|
|
};
|
|
break;
|
|
}
|
|
println!("connected to {endpoint}");
|
|
let msg = json!({ "hostname": hostname });
|
|
socket.send(Message::Text(msg.to_string())).unwrap();
|
|
Ok(socket)
|
|
}
|
|
|
|
pub async fn send_to_ipbl_websocket(
|
|
ws: &mut WebSocket<MaybeTlsStream<TcpStream>>,
|
|
ip: &IpEvent,
|
|
) -> bool {
|
|
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
|
|
|
|
if ws.can_write() {
|
|
match ws.send(Message::Text(msg)) {
|
|
Ok(_) => {}
|
|
Err(e) => {
|
|
println!("err send read: {e:?}");
|
|
ws.close(None).unwrap_or(());
|
|
return false;
|
|
}
|
|
};
|
|
} else {
|
|
ws.close(None).unwrap_or(());
|
|
return false;
|
|
};
|
|
|
|
if ws.can_read() {
|
|
match ws.read() {
|
|
Ok(_) => {}
|
|
Err(e) => {
|
|
println!("err send read: {e:?}");
|
|
ws.close(None).unwrap_or(());
|
|
return false;
|
|
}
|
|
};
|
|
} else {
|
|
ws.close(None).unwrap_or(());
|
|
return false;
|
|
};
|
|
true
|
|
}
|