ipblc/old/zmqcom.rs

107 lines
3.1 KiB
Rust
Raw Permalink Normal View History

2023-03-05 23:05:50 +01:00
use crate::config::{Context, ZMQCfg};
2023-01-15 16:05:34 +01:00
use crate::ip::IpEvent;
2023-01-08 14:09:13 +01:00
use crate::utils::gethostname;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
2023-01-10 18:00:40 +01:00
use tokio::sync::RwLock;
const ZMQPROTO: &str = "tcp";
2023-01-08 14:09:13 +01:00
pub async fn zconnect<'a>(
2023-03-05 23:05:50 +01:00
zmqcfg: &ZMQCfg,
2023-01-08 14:09:13 +01:00
zmqtype: zmq::SocketType,
) -> Result<zmq::Socket, zmq::Error> {
let zctx = zmq::Context::new();
let zmqhost = &zmqcfg.hostname;
2022-12-30 20:18:15 +01:00
let zmqport = &zmqcfg.port;
let socket = zctx.socket(zmqtype).unwrap();
let connectstring = format!("{ZMQPROTO}://{zmqhost}:{zmqport}");
socket.connect(&connectstring.as_str())?;
Ok(socket)
}
2023-01-08 14:09:13 +01:00
2023-01-15 16:05:34 +01:00
pub async fn zmqinit(ctx: &Arc<RwLock<Context>>, ipeventtx: &Sender<IpEvent>) -> zmq::Socket {
2023-01-08 14:09:13 +01:00
let ctxarc = Arc::clone(&ctx);
let zmqreqsocket;
let zmqsubsocket;
{
2023-01-10 18:00:40 +01:00
let zmqctx = ctxarc.read().await;
2023-01-08 14:09:13 +01:00
zmqreqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ)
.await
.unwrap();
zmqsubsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB)
.await
.unwrap();
}
2023-01-15 16:05:34 +01:00
listenpubsub(&ctx, ipeventtx.clone(), zmqsubsocket).await;
2023-01-08 14:09:13 +01:00
return zmqreqsocket;
}
2023-01-15 16:05:34 +01:00
async fn listenpubsub(ctx: &Arc<RwLock<Context>>, txpubsub: Sender<IpEvent>, socket: zmq::Socket) {
2023-01-08 21:16:06 +01:00
let prefix;
{
2023-01-10 18:00:40 +01:00
let ctx = ctx.read().await;
2023-01-08 21:16:06 +01:00
prefix = format!(
"{sub} ",
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
);
socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription");
}
2023-01-08 14:09:13 +01:00
tokio::spawn(async move {
loop {
let msgs: Option<String> = match socket.recv_string(0) {
Ok(s) => match s {
Ok(ss) => Some(ss),
Err(e) => {
println!("{e:?}");
None
}
},
Err(e) => {
println!("{e:?}");
None
}
};
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
2023-01-15 16:05:34 +01:00
let tosend: IpEvent = serde_json::from_str(msg).unwrap();
if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
2023-01-08 14:09:13 +01:00
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
});
}
2023-01-15 22:07:56 +01:00
pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpEvent, _ret: &mut Vec<String>) {
2023-01-08 14:09:13 +01:00
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match reqsocket.send(&msg, 0) {
Ok(_) => {}
Err(e) => {
println!("{e:?}")
}
};
match reqsocket.recv_string(0) {
Ok(o) => match o {
Ok(_) => {}
Err(ee) => {
println!("{ee:?}")
}
},
Err(e) => {
println!("{e:?}")
}
};
}