2023-03-05 23:05:50 +01:00
|
|
|
use crate::config::{Context, ZMQCfg};
|
2023-01-15 16:05:34 +01:00
|
|
|
use crate::ip::IpEvent;
|
2023-01-08 14:09:13 +01:00
|
|
|
use crate::utils::gethostname;
|
|
|
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
use tokio::sync::mpsc::Sender;
|
2023-01-10 18:00:40 +01:00
|
|
|
use tokio::sync::RwLock;
|
2022-05-27 13:59:17 +02:00
|
|
|
|
|
|
|
const ZMQPROTO: &str = "tcp";
|
|
|
|
|
2023-01-08 14:09:13 +01:00
|
|
|
pub async fn zconnect<'a>(
|
2023-03-05 23:05:50 +01:00
|
|
|
zmqcfg: &ZMQCfg,
|
2023-01-08 14:09:13 +01:00
|
|
|
zmqtype: zmq::SocketType,
|
|
|
|
) -> Result<zmq::Socket, zmq::Error> {
|
2022-05-27 13:59:17 +02:00
|
|
|
let zctx = zmq::Context::new();
|
|
|
|
let zmqhost = &zmqcfg.hostname;
|
2022-12-30 20:18:15 +01:00
|
|
|
let zmqport = &zmqcfg.port;
|
2022-05-27 13:59:17 +02:00
|
|
|
let socket = zctx.socket(zmqtype).unwrap();
|
|
|
|
let connectstring = format!("{ZMQPROTO}://{zmqhost}:{zmqport}");
|
|
|
|
socket.connect(&connectstring.as_str())?;
|
|
|
|
Ok(socket)
|
|
|
|
}
|
2023-01-08 14:09:13 +01:00
|
|
|
|
2023-01-15 16:05:34 +01:00
|
|
|
pub async fn zmqinit(ctx: &Arc<RwLock<Context>>, ipeventtx: &Sender<IpEvent>) -> zmq::Socket {
|
2023-01-08 14:09:13 +01:00
|
|
|
let ctxarc = Arc::clone(&ctx);
|
|
|
|
|
|
|
|
let zmqreqsocket;
|
|
|
|
let zmqsubsocket;
|
|
|
|
{
|
2023-01-10 18:00:40 +01:00
|
|
|
let zmqctx = ctxarc.read().await;
|
2023-01-08 14:09:13 +01:00
|
|
|
zmqreqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
zmqsubsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
|
2023-01-15 16:05:34 +01:00
|
|
|
listenpubsub(&ctx, ipeventtx.clone(), zmqsubsocket).await;
|
2023-01-08 14:09:13 +01:00
|
|
|
return zmqreqsocket;
|
|
|
|
}
|
|
|
|
|
2023-01-15 16:05:34 +01:00
|
|
|
async fn listenpubsub(ctx: &Arc<RwLock<Context>>, txpubsub: Sender<IpEvent>, socket: zmq::Socket) {
|
2023-01-08 21:16:06 +01:00
|
|
|
let prefix;
|
|
|
|
{
|
2023-01-10 18:00:40 +01:00
|
|
|
let ctx = ctx.read().await;
|
2023-01-08 21:16:06 +01:00
|
|
|
prefix = format!(
|
|
|
|
"{sub} ",
|
|
|
|
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
|
|
|
|
);
|
|
|
|
socket
|
|
|
|
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
|
|
|
|
.expect("failed setting subscription");
|
|
|
|
}
|
2023-01-08 14:09:13 +01:00
|
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
loop {
|
|
|
|
let msgs: Option<String> = match socket.recv_string(0) {
|
|
|
|
Ok(s) => match s {
|
|
|
|
Ok(ss) => Some(ss),
|
|
|
|
Err(e) => {
|
|
|
|
println!("{e:?}");
|
|
|
|
None
|
|
|
|
}
|
|
|
|
},
|
|
|
|
Err(e) => {
|
|
|
|
println!("{e:?}");
|
|
|
|
None
|
|
|
|
}
|
|
|
|
};
|
|
|
|
match msgs {
|
|
|
|
Some(ss) => {
|
|
|
|
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
|
2023-01-15 16:05:34 +01:00
|
|
|
let tosend: IpEvent = serde_json::from_str(msg).unwrap();
|
|
|
|
if tosend.ipdata.hostname != gethostname(true)
|
|
|
|
|| tosend.msgtype == "init".to_string()
|
|
|
|
{
|
2023-01-08 14:09:13 +01:00
|
|
|
txpubsub.send(tosend).await.unwrap();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
None => {}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2023-01-15 22:07:56 +01:00
|
|
|
pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpEvent, _ret: &mut Vec<String>) {
|
2023-01-08 14:09:13 +01:00
|
|
|
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
|
|
|
|
match reqsocket.send(&msg, 0) {
|
|
|
|
Ok(_) => {}
|
|
|
|
Err(e) => {
|
|
|
|
println!("{e:?}")
|
|
|
|
}
|
|
|
|
};
|
|
|
|
match reqsocket.recv_string(0) {
|
|
|
|
Ok(o) => match o {
|
|
|
|
Ok(_) => {}
|
|
|
|
Err(ee) => {
|
|
|
|
println!("{ee:?}")
|
|
|
|
}
|
|
|
|
},
|
|
|
|
Err(e) => {
|
|
|
|
println!("{e:?}")
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|