updated error handling

This commit is contained in:
Paul 2023-04-10 16:33:03 +02:00
parent 2dfee34f7c
commit bb0a272d0f
2 changed files with 21 additions and 12 deletions

View File

@ -4,7 +4,7 @@ use crate::ip::{filter, IpData, IpEvent};
use crate::monitoring::apiserver; use crate::monitoring::apiserver;
use crate::utils::{gethostname, read_lines, sleep_ms}; use crate::utils::{gethostname, read_lines, sleep_ms};
use crate::webservice::send_to_ipbl_api; use crate::webservice::send_to_ipbl_api;
use crate::websocket::{send_to_ipbl_websocket, websocketinit}; use crate::websocket::{send_to_ipbl_websocket, websocketpubsub, websocketreqrep};
use chrono::prelude::*; use chrono::prelude::*;
use chrono::prelude::{DateTime, Local}; use chrono::prelude::{DateTime, Local};
@ -37,9 +37,12 @@ pub async fn run() {
let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(WS_CHAN_SIZE); let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(WS_CHAN_SIZE);
let ipeventtxarc = Arc::new(RwLock::new(ipeventtx)); let ipeventtxarc = Arc::new(RwLock::new(ipeventtx));
let ctxws = Arc::clone(&ctxarc); let ctxwsrr = Arc::clone(&ctxarc);
let mut ipeventws = Arc::clone(&ipeventtxarc); let ipeventws = Arc::clone(&ipeventtxarc);
let mut wssocketrr = websocketinit(&ctxws, ipeventws).await; let mut wssocketrr = websocketreqrep(&ctxwsrr).await;
let ctxwsps = Arc::clone(&ctxarc);
websocketpubsub(&ctxwsps, ipeventws).await;
let mut blrx = watchfiles(&ctxarc).await; let mut blrx = watchfiles(&ctxarc).await;
@ -70,8 +73,8 @@ pub async fn run() {
}; };
let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await; let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
if !status { if !status {
ipeventws = Arc::clone(&ipeventtxarc); wssocketrr = websocketreqrep(&ctxwsrr).await;
wssocketrr = websocketinit(&ctxws, ipeventws).await; continue;
} }
} }
continue continue
@ -93,8 +96,8 @@ pub async fn run() {
send_to_ipbl_api(&ctx.client, &ctx.flags.server, &ipe).await; send_to_ipbl_api(&ctx.client, &ctx.flags.server, &ipe).await;
let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await; let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
if !status { if !status {
ipeventws = Arc::clone(&ipeventtxarc); wssocketrr = websocketreqrep(&ctxwsrr).await;
wssocketrr = websocketinit(&ctxws, ipeventws).await; continue;
} }
} }

View File

@ -3,6 +3,7 @@ use crate::ip::IpEvent;
use crate::utils::gethostname; use crate::utils::gethostname;
use serde_json::json; use serde_json::json;
use std::io::{self, Write};
use std::net::TcpStream; use std::net::TcpStream;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
@ -10,9 +11,8 @@ use tokio::sync::RwLock;
use tungstenite::stream::*; use tungstenite::stream::*;
use tungstenite::*; use tungstenite::*;
pub async fn websocketinit( pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>, ctxarc: &Arc<RwLock<Context>>,
ipeventtx: Arc<RwLock<Sender<IpEvent>>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> { ) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event); let (mut wssocketrr, bootstrap_event);
{ {
@ -25,14 +25,17 @@ pub async fn websocketinit(
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await; send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
} }
wslistenpubsub(ctxarc, ipeventtx).await;
return wssocketrr; return wssocketrr;
} }
async fn wslistenpubsub(ctxarc: &Arc<RwLock<Context>>, txpubsub: Arc<RwLock<Sender<IpEvent>>>) { pub async fn websocketpubsub(
ctxarc: &Arc<RwLock<Context>>,
txpubsub: Arc<RwLock<Sender<IpEvent>>>,
) {
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
let cfg = ctx.cfg.ws.get("pubsub").unwrap().clone(); let cfg = ctx.cfg.ws.get("pubsub").unwrap().clone();
let hostname = ctx.hostname.clone(); let hostname = ctx.hostname.clone();
drop(ctx);
let mut websocket = Arc::new(RwLock::new( let mut websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, hostname.clone()).await.unwrap(), websocketconnect(&cfg, hostname.clone()).await.unwrap(),
)); ));
@ -66,7 +69,10 @@ pub async fn websocketconnect<'a>(
wscfg: &WebSocketCfg, wscfg: &WebSocketCfg,
hostname: String, hostname: String,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> { ) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> {
print!("connecting to {} ...", &wscfg.endpoint);
io::stdout().flush().unwrap();
let (mut socket, _response) = connect(&wscfg.endpoint).expect("Can't connect"); let (mut socket, _response) = connect(&wscfg.endpoint).expect("Can't connect");
print!(" connected!\n");
let msg = json!({ "hostname": hostname }); let msg = json!({ "hostname": hostname });
socket socket
.write_message(Message::Text(msg.to_string())) .write_message(Message::Text(msg.to_string()))