changed IpData->IpEvent{IpData} struct
Some checks reported errors
continuous-integration/drone/push Build encountered an error

This commit is contained in:
Paul 2023-01-15 16:05:34 +01:00
parent ee5119c512
commit e8c7172219
3 changed files with 49 additions and 27 deletions

View File

@ -18,6 +18,12 @@ lazy_static! {
static ref R_DATE: Regex = Regex::new(include_str!("regexps/date.txt")).unwrap();
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IpEvent {
pub msgtype: String,
pub ipdata: IpData,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq)]
pub struct IpData {
pub ip: String,

View File

@ -1,7 +1,7 @@
use crate::api::apiserver;
use crate::config::{Context, GIT_VERSION};
use crate::fw::{fwblock, fwinit};
use crate::ip::{filter, IpData};
use crate::ip::{filter, IpData, IpEvent};
use crate::utils::read_lines;
use crate::ws::send_to_ipbl_ws;
use crate::zmqcom::{send_to_ipbl_zmq, zmqinit};
@ -39,31 +39,34 @@ pub async fn run() {
let mut fwlen: usize = 0;
// initialize zeromq sockets
let (ipdatatx, mut ipdatarx): (Sender<IpData>, Receiver<IpData>) = channel(ZMQ_CHAN_SIZE);
let zmqreqsocket = zmqinit(&ctxclone, &ipdatatx).await;
let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(ZMQ_CHAN_SIZE);
let zmqreqsocket = zmqinit(&ctxclone, &ipeventtx).await;
let mut blrx = watchfiles(&ctxclone).await;
let ctxclone = Arc::clone(&ctxarc);
tokio::spawn(async move {
compare_files_changes(&ctxclone, &mut blrx, &ipdatatx).await;
compare_files_changes(&ctxclone, &mut blrx, &ipeventtx).await;
});
let mut ip_init = IpData {
let ipevent_init = IpEvent {
msgtype: String::from("init"),
ipdata: IpData {
ip: "".to_string(),
src: "".to_string(),
date: "".to_string(),
hostname: "".to_string(),
mode: "init".to_string(),
mode: "".to_string(),
},
};
send_to_ipbl_zmq(&zmqreqsocket, &mut ip_init).await;
send_to_ipbl_zmq(&zmqreqsocket, &ipevent_init).await;
loop {
let mut ret: Vec<String> = Vec::new();
// wait for logs parse and zmq channel receive
//let mut received_ip = ipdatarx.recv();
let ipdata_wait = ipdatarx.recv();
let ipdata_wait = ipeventrx.recv();
let apimsg_wait = apirx.recv();
let ctxclone = Arc::clone(&ctxarc);
@ -74,22 +77,29 @@ pub async fn run() {
let mut ctx = ctxclone.write().await;
if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() {
for ip_to_send in &mut ctx.get_blocklist_toblock().await {
ip_to_send.mode = "init".to_string();
send_to_ipbl_zmq(&zmqreqsocket, ip_to_send).await;
if received_ip.ipdata.ip == "".to_string() && received_ip.msgtype == "init".to_string() {
for ip_to_send in ctx.get_blocklist_toblock().await {
let event = IpEvent{
msgtype: String::from("init"),
ipdata: ip_to_send,
};
send_to_ipbl_zmq(&zmqreqsocket, &event).await;
}
continue;
}
// refresh context blocklist
let filtered_ip = ctx.update_blocklist(&received_ip).await;
let filtered_ip = ctx.update_blocklist(&received_ip.ipdata).await;
// send ip list to ws and zmq sockets
if let Some(ip) = filtered_ip {
println!("sending {} to ws and zmq", ip.ip);
send_to_ipbl_ws(&ctx, &ip, &mut ret).await;
send_to_ipbl_zmq(&zmqreqsocket, &ip).await;
let event = IpEvent{
msgtype: String::from("add"),
ipdata: ip,
};
send_to_ipbl_zmq(&zmqreqsocket, &event).await;
}
}
_val = apimsg_wait => {
@ -171,7 +181,7 @@ async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, b
async fn compare_files_changes(
ctxarc: &Arc<RwLock<Context>>,
inrx: &mut Receiver<FileEvent>,
ipdatatx: &Sender<IpData>,
ipeventtx: &Sender<IpEvent>,
) {
let mut tnets;
loop {
@ -231,7 +241,11 @@ async fn compare_files_changes(
}
}
for ip in iplist {
ipdatatx.send(ip).await.unwrap();
let ipevent = IpEvent {
msgtype: String::from("file"),
ipdata: ip,
};
ipeventtx.send(ipevent).await.unwrap();
}
}
None => {}

View File

@ -1,5 +1,5 @@
use crate::config::{Context, ZMQ};
use crate::ip::IpData;
use crate::ip::IpEvent;
use crate::utils::gethostname;
use std::sync::Arc;
@ -21,7 +21,7 @@ pub async fn zconnect<'a>(
Ok(socket)
}
pub async fn zmqinit(ctx: &Arc<RwLock<Context>>, ipdatatx: &Sender<IpData>) -> zmq::Socket {
pub async fn zmqinit(ctx: &Arc<RwLock<Context>>, ipeventtx: &Sender<IpEvent>) -> zmq::Socket {
let ctxarc = Arc::clone(&ctx);
let zmqreqsocket;
@ -36,11 +36,11 @@ pub async fn zmqinit(ctx: &Arc<RwLock<Context>>, ipdatatx: &Sender<IpData>) -> z
.unwrap();
}
listenpubsub(&ctx, ipdatatx.clone(), zmqsubsocket).await;
listenpubsub(&ctx, ipeventtx.clone(), zmqsubsocket).await;
return zmqreqsocket;
}
async fn listenpubsub(ctx: &Arc<RwLock<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
async fn listenpubsub(ctx: &Arc<RwLock<Context>>, txpubsub: Sender<IpEvent>, socket: zmq::Socket) {
let prefix;
{
let ctx = ctx.read().await;
@ -71,8 +71,10 @@ async fn listenpubsub(ctx: &Arc<RwLock<Context>>, txpubsub: Sender<IpData>, sock
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
let tosend: IpData = serde_json::from_str(msg).unwrap();
if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() {
let tosend: IpEvent = serde_json::from_str(msg).unwrap();
if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
txpubsub.send(tosend).await.unwrap();
}
}
@ -82,7 +84,7 @@ async fn listenpubsub(ctx: &Arc<RwLock<Context>>, txpubsub: Sender<IpData>, sock
});
}
pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpData) {
pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpEvent) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match reqsocket.send(&msg, 0) {
Ok(_) => {}