diff --git a/src/config.rs b/src/config.rs index ff6c9e7..8c3f874 100644 --- a/src/config.rs +++ b/src/config.rs @@ -194,7 +194,7 @@ impl Context { .entry(ipdata.ip.to_string()) .or_insert(BlockIpData { ipdata: ipdata.clone(), - tryfail: 100, + tryfail: set.tryfail, starttime, blocktime: set.blocktime, }); diff --git a/src/ipblc.rs b/src/ipblc.rs index 7b579fe..4190d84 100644 --- a/src/ipblc.rs +++ b/src/ipblc.rs @@ -2,7 +2,7 @@ use crate::api::apiserver; use crate::config::{Context, GIT_VERSION}; use crate::fw::{fwblock, fwinit}; use crate::ip::{filter, IpData, IpEvent}; -use crate::utils::read_lines; +use crate::utils::{read_lines, sleep_ms}; use crate::ws::send_to_ipbl_ws; use crate::zmqcom::{send_to_ipbl_zmq, zmqinit}; @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; -use tokio::time::sleep; pub const PKG_NAME: &str = env!("CARGO_PKG_NAME"); const BL_CHAN_SIZE: usize = 32; @@ -22,7 +21,9 @@ const ZMQ_CHAN_SIZE: usize = 64; const API_CHAN_SIZE: usize = 64; pub async fn run() { - let ctxarc = Arc::new(RwLock::new(Context::new().await)); + let globalctx = Context::new().await; + let ctxarc = Arc::new(RwLock::new(globalctx)); + let mut ret: Vec = Vec::new(); let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); @@ -60,16 +61,16 @@ pub async fn run() { mode: "".to_string(), }, }; - send_to_ipbl_zmq(&zmqreqsocket, &ipevent_bootstrap).await; + send_to_ipbl_zmq(&zmqreqsocket, &ipevent_bootstrap, &mut ret).await; loop { - let mut ret: Vec = Vec::new(); + ret = Vec::new(); // wait for logs parse and zmq channel receive //let mut received_ip = ipdatarx.recv(); let ipdata_wait = ipeventrx.recv(); let apimsg_wait = apirx.recv(); - let force_wait = sleep(tokio::time::Duration::from_millis(200)); + let force_wait = sleep_ms(500); let ctxclone = Arc::clone(&ctxarc); @@ -85,7 +86,7 @@ pub async fn run() { msgtype: String::from("init"), ipdata: ip_to_send, }; - send_to_ipbl_zmq(&zmqreqsocket, &ipe).await; + send_to_ipbl_zmq(&zmqreqsocket, &ipe, &mut ret).await; } continue } @@ -97,17 +98,16 @@ pub async fn run() { if let Some(ip) = filtered_ip { if received_ip.msgtype != "init" { println!("sending {} to ws and zmq", ip.ip); - send_to_ipbl_ws(&ctx, &ip, &mut ret).await; let event = IpEvent{ msgtype: String::from("add"), ipdata: ip, }; - send_to_ipbl_zmq(&zmqreqsocket, &event).await; + send_to_ipbl_ws(&ctx, &event, &mut ret).await; + send_to_ipbl_zmq(&zmqreqsocket, &event, &mut ret).await; } } } - _val = apimsg_wait => { - } + _val = apimsg_wait => {} _val = force_wait => {} }; diff --git a/src/utils.rs b/src/utils.rs index 3b754ca..a746e14 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -33,7 +33,7 @@ pub fn _dedup(list: &mut Vec) -> usize { list.len() } -pub async fn _sleep_ms(ms: u64) { +pub async fn sleep_ms(ms: u64) { sleep(Duration::from_millis(ms)).await; } diff --git a/src/ws.rs b/src/ws.rs index 91dc829..20818a7 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -1,13 +1,13 @@ use crate::config::Context; -use crate::ip::IpData; +use crate::ip::{IpData, IpEvent}; use crate::utils::sleep_s; use reqwest::Error as ReqError; -pub async fn send_to_ipbl_ws(ctx: &Context, ip: &IpData, ret: &mut Vec) { +pub async fn send_to_ipbl_ws(ctx: &Context, ip: &IpEvent, ret: &mut Vec) { ret.push(format!("host: {hostname}", hostname = ctx.hostname)); loop { - match push_ip(&ctx, &ip, ret).await { + match push_ip(&ctx, &ip.ipdata, ret).await { Ok(_) => { break; } diff --git a/src/zmqcom.rs b/src/zmqcom.rs index e87f0dd..2fb6365 100644 --- a/src/zmqcom.rs +++ b/src/zmqcom.rs @@ -84,7 +84,7 @@ async fn listenpubsub(ctx: &Arc>, txpubsub: Sender, soc }); } -pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpEvent) { +pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpEvent, _ret: &mut Vec) { let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); match reqsocket.send(&msg, 0) { Ok(_) => {}