more stable ipblc websocket feat

This commit is contained in:
Paul 2023-04-09 15:05:09 +02:00
parent 47cc30e79e
commit 50b9c7f7b2
3 changed files with 27 additions and 133 deletions

View File

@ -16,7 +16,6 @@ use std::path::Path;
pub const GIT_VERSION: &str = git_version!(); pub const GIT_VERSION: &str = git_version!();
const MASTERSERVER: &str = "ipbl.paulbsd.com"; const MASTERSERVER: &str = "ipbl.paulbsd.com";
const ZMQSUBSCRIPTION: &str = "ipbl";
const WSSUBSCRIPTION: &str = "ipbl"; const WSSUBSCRIPTION: &str = "ipbl";
const CONFIG_RETRY: u64 = 10; const CONFIG_RETRY: u64 = 10;
@ -288,7 +287,6 @@ pub struct Config {
#[serde(skip_serializing)] #[serde(skip_serializing)]
pub trustnets: Vec<String>, pub trustnets: Vec<String>,
pub ws: HashMap<String, WebSocketCfg>, pub ws: HashMap<String, WebSocketCfg>,
pub zmq: HashMap<String, ZMQCfg>,
pub api: String, pub api: String,
} }
@ -348,17 +346,6 @@ impl Config {
endpoint: format!("wss://{}/wsrr", MASTERSERVER.to_string()), endpoint: format!("wss://{}/wsrr", MASTERSERVER.to_string()),
subscription: WSSUBSCRIPTION.to_string(), subscription: WSSUBSCRIPTION.to_string(),
})]), })]),
zmq: HashMap::from([("pubsub".to_string(),ZMQCfg{
t: "pubsub".to_string(),
hostname: MASTERSERVER.to_string(),
port: 9999,
subscription: ZMQSUBSCRIPTION.to_string(),
}),("reqrep".to_string(),ZMQCfg {
t: "reqrep".to_string(),
hostname: MASTERSERVER.to_string(),
port: 9998,
subscription: String::new(),
})]),
api: String::from("127.0.0.1:8060") api: String::from("127.0.0.1:8060")
} }
} }
@ -367,7 +354,6 @@ impl Config {
self.get_global_config(&ctx).await?; self.get_global_config(&ctx).await?;
self.get_trustnets(&ctx).await?; self.get_trustnets(&ctx).await?;
self.get_sets(&ctx).await?; self.get_sets(&ctx).await?;
self.get_zmq_config(&ctx).await?;
self.get_ws_config(&ctx).await?; self.get_ws_config(&ctx).await?;
Ok(()) Ok(())
} }
@ -445,30 +431,6 @@ impl Config {
Ok(()) Ok(())
} }
async fn get_zmq_config(&mut self, ctx: &Context) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = ctx
.client
.get(format!("{server}/config/zmq", server = ctx.flags.server))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: HashMap<String, ZMQCfg> = match req.json::<Vec<ZMQCfg>>().await {
Ok(res) => {
let mut out: HashMap<String, ZMQCfg> = HashMap::new();
res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.t.to_string(), x);
});
out
}
Err(err) => return Err(err),
};
self.zmq = data;
Ok(())
}
async fn get_ws_config(&mut self, ctx: &Context) -> Result<(), ReqError> { async fn get_ws_config(&mut self, ctx: &Context) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = ctx let resp: Result<Response, ReqError> = ctx
.client .client
@ -509,7 +471,7 @@ impl Config {
pub fn bootstrap_event(&self) -> IpEvent { pub fn bootstrap_event(&self) -> IpEvent {
IpEvent { IpEvent {
msgtype: String::from("bootstrap"), msgtype: String::from("bootstrap"),
mode: String::from("socket"), mode: String::from("ws"),
hostname: gethostname(true), hostname: gethostname(true),
ipdata: IpData { ipdata: IpData {
ip: "".to_string(), ip: "".to_string(),
@ -537,15 +499,6 @@ pub struct Set {
pub tryfail: i64, pub tryfail: i64,
} }
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ZMQCfg {
#[serde(rename = "type")]
pub t: String,
pub hostname: String,
pub port: i64,
pub subscription: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct WebSocketCfg { pub struct WebSocketCfg {
#[serde(rename = "type")] #[serde(rename = "type")]
@ -592,7 +545,7 @@ mod test {
for _i in 0..10 { for _i in 0..10 {
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("zmq"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: IpData { ipdata: IpData {
ip: "1.1.1.1".to_string(), ip: "1.1.1.1".to_string(),
@ -607,7 +560,7 @@ mod test {
for _ in 0..10 { for _ in 0..10 {
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("zmq"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: IpData { ipdata: IpData {
ip: "1.1.1.2".to_string(), ip: "1.1.1.2".to_string(),
@ -621,7 +574,7 @@ mod test {
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("zmq"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: IpData { ipdata: IpData {
ip: "1.1.1.3".to_string(), ip: "1.1.1.3".to_string(),
@ -634,7 +587,7 @@ mod test {
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("zmq"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: IpData { ipdata: IpData {
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
@ -647,7 +600,7 @@ mod test {
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("zmq"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: IpData { ipdata: IpData {
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),

View File

@ -38,7 +38,7 @@ pub async fn run() {
// initialize sockets // initialize sockets
let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(WS_CHAN_SIZE); let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(WS_CHAN_SIZE);
let ctxws = Arc::clone(&ctxarc); let ctxws = Arc::clone(&ctxarc);
let wssocketrr = websocketinit(&ctxws, &ipeventtx).await; let mut wssocketrr = websocketinit(&ctxws, &ipeventtx).await;
let mut blrx = watchfiles(&ctxarc).await; let mut blrx = watchfiles(&ctxarc).await;
@ -49,7 +49,7 @@ pub async fn run() {
let bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone(); let bootstrap_event = ctxarc.read().await.cfg.bootstrap_event().clone();
send_to_ipbl_websocket(&wssocketrr, &bootstrap_event, &mut ret).await; send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event, &mut ret).await;
loop { loop {
ret = Vec::new(); ret = Vec::new();
@ -57,8 +57,8 @@ pub async fn run() {
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
tokio::select! { tokio::select! {
val = ipeventrx.recv() => { ipevent = ipeventrx.recv() => {
let received_ip = val.unwrap(); let received_ip = ipevent.unwrap();
let mut ctx = ctxclone.write().await; let mut ctx = ctxclone.write().await;
@ -70,7 +70,7 @@ pub async fn run() {
hostname: fqdn.clone(), hostname: fqdn.clone(),
ipdata: ip_to_send, ipdata: ip_to_send,
}; };
send_to_ipbl_websocket(&wssocketrr, &ipe, &mut ret).await; send_to_ipbl_websocket(&mut wssocketrr, &ipe, &mut ret).await;
} }
continue continue
} }
@ -84,15 +84,12 @@ pub async fn run() {
println!("sending {} to api and ws", ipevent.ipdata.ip); println!("sending {} to api and ws", ipevent.ipdata.ip);
let event = IpEvent{ let event = IpEvent{
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("socket"), mode: String::from("ws"),
hostname: fqdn.clone(), hostname: fqdn.clone(),
ipdata: ipevent.ipdata, ipdata: ipevent.ipdata,
}; };
println!("blabla1");
send_to_ipbl_api(&ctx.client, &ctx.hostname, &ctx.flags.server, &event, &mut ret).await; send_to_ipbl_api(&ctx.client, &ctx.hostname, &ctx.flags.server, &event, &mut ret).await;
println!("blabla2"); send_to_ipbl_websocket(&mut wssocketrr, &event, &mut ret).await;
send_to_ipbl_websocket(&wssocketrr, &event, &mut ret).await;
println!("blabla3");
} }
} }
} }

View File

@ -1,6 +1,6 @@
use crate::config::{Context, WebSocketCfg}; use crate::config::{Context, WebSocketCfg};
use crate::ip::IpEvent; use crate::ip::IpEvent;
use crate::utils::{gethostname, sleep_ms}; use crate::utils::gethostname;
use serde_json::json; use serde_json::json;
use std::net::TcpStream; use std::net::TcpStream;
@ -25,8 +25,8 @@ pub async fn websocketconnect<'a>(
pub async fn websocketinit( pub async fn websocketinit(
ctxarc: &Arc<RwLock<Context>>, ctxarc: &Arc<RwLock<Context>>,
ipeventtx: &Sender<IpEvent>, ipeventtx: &Sender<IpEvent>,
) -> Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>> { ) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (wssocketps, wssocketreqrep); let (wssocketps, wssocketrr);
{ {
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
wssocketps = Arc::new(RwLock::new( wssocketps = Arc::new(RwLock::new(
@ -34,15 +34,13 @@ pub async fn websocketinit(
.await .await
.unwrap(), .unwrap(),
)); ));
wssocketreqrep = Arc::new(RwLock::new( wssocketrr = websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone())
websocketconnect(&ctx.cfg.ws.get("reqrep").unwrap(), ctx.hostname.clone()) .await
.await .unwrap();
.unwrap(),
));
} }
wslistenpubsub(wssocketps, ipeventtx.clone()).await; wslistenpubsub(wssocketps, ipeventtx.clone()).await;
return wssocketreqrep; return wssocketrr;
} }
async fn wslistenpubsub( async fn wslistenpubsub(
@ -51,99 +49,45 @@ async fn wslistenpubsub(
) { ) {
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
let msgs: Option<String>;
{ {
let mut ws = websocket.write().await; let mut ws = websocket.write().await;
msgs = match ws.read_message() { match ws.read_message() {
Ok(s) => { Ok(msg) => {
println!("msg: {}", s); let tosend: IpEvent =
None serde_json::from_str(msg.to_string().as_str()).unwrap();
}
Err(e) => {
println!("error: {e:?}");
ws.close(None).unwrap();
return;
}
};
match msgs {
Some(ss) => {
let tosend: IpEvent = serde_json::from_str(ss.as_str()).unwrap();
if tosend.ipdata.hostname != gethostname(true) if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string() || tosend.msgtype == "init".to_string()
{ {
txpubsub.send(tosend).await.unwrap(); txpubsub.send(tosend).await.unwrap();
} }
} }
None => {}
};
}
}
});
}
async fn wslistenreqrep(
websocket: Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
txpubsub: Sender<IpEvent>,
) {
tokio::spawn(async move {
loop {
let msgs: Option<String>;
{
let mut ws = websocket.write().await;
msgs = match ws.read_message() {
Ok(s) => {
println!("msg: {}", s);
None
}
Err(e) => { Err(e) => {
println!("error: {e:?}"); println!("error: {e:?}");
ws.close(None).unwrap(); ws.close(None).unwrap();
return; return;
} }
}; };
match msgs {
Some(ss) => {
let tosend: IpEvent = serde_json::from_str(ss.as_str()).unwrap();
if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
} }
} }
}); });
} }
pub async fn send_to_ipbl_websocket( pub async fn send_to_ipbl_websocket(
wssocket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>, ws: &mut WebSocket<MaybeTlsStream<TcpStream>>,
ip: &IpEvent, ip: &IpEvent,
_ret: &mut Vec<String>, _ret: &mut Vec<String>,
) { ) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
println!("testarc");
let wsclone = Arc::clone(wssocket);
println!("testarc2");
let mut ws = wsclone.write().await;
println!("write");
match ws.write_message(Message::Text(msg)) { match ws.write_message(Message::Text(msg)) {
Ok(o) => { Ok(_) => {}
println!("{o:?}")
}
Err(e) => { Err(e) => {
println!("err 1: {e:?}") println!("err 1: {e:?}")
} }
}; };
println!("read");
match ws.read_message() { match ws.read_message() {
Ok(o) => { Ok(_) => {}
println!("{o}")
}
Err(e) => { Err(e) => {
println!("err 2: {e:?}") println!("err 2: {e:?}")
} }