fixed sleep timeout
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
Paul 2023-01-15 22:07:56 +01:00
parent a84d3d0ed8
commit bb9404ec7c
5 changed files with 17 additions and 17 deletions

View File

@ -194,7 +194,7 @@ impl Context {
.entry(ipdata.ip.to_string()) .entry(ipdata.ip.to_string())
.or_insert(BlockIpData { .or_insert(BlockIpData {
ipdata: ipdata.clone(), ipdata: ipdata.clone(),
tryfail: 100, tryfail: set.tryfail,
starttime, starttime,
blocktime: set.blocktime, blocktime: set.blocktime,
}); });

View File

@ -2,7 +2,7 @@ use crate::api::apiserver;
use crate::config::{Context, GIT_VERSION}; use crate::config::{Context, GIT_VERSION};
use crate::fw::{fwblock, fwinit}; use crate::fw::{fwblock, fwinit};
use crate::ip::{filter, IpData, IpEvent}; use crate::ip::{filter, IpData, IpEvent};
use crate::utils::read_lines; use crate::utils::{read_lines, sleep_ms};
use crate::ws::send_to_ipbl_ws; use crate::ws::send_to_ipbl_ws;
use crate::zmqcom::{send_to_ipbl_zmq, zmqinit}; use crate::zmqcom::{send_to_ipbl_zmq, zmqinit};
@ -14,7 +14,6 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio::time::sleep;
pub const PKG_NAME: &str = env!("CARGO_PKG_NAME"); pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
const BL_CHAN_SIZE: usize = 32; const BL_CHAN_SIZE: usize = 32;
@ -22,7 +21,9 @@ const ZMQ_CHAN_SIZE: usize = 64;
const API_CHAN_SIZE: usize = 64; const API_CHAN_SIZE: usize = 64;
pub async fn run() { pub async fn run() {
let ctxarc = Arc::new(RwLock::new(Context::new().await)); let globalctx = Context::new().await;
let ctxarc = Arc::new(RwLock::new(globalctx));
let mut ret: Vec<String> = Vec::new();
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
@ -60,16 +61,16 @@ pub async fn run() {
mode: "".to_string(), mode: "".to_string(),
}, },
}; };
send_to_ipbl_zmq(&zmqreqsocket, &ipevent_bootstrap).await; send_to_ipbl_zmq(&zmqreqsocket, &ipevent_bootstrap, &mut ret).await;
loop { loop {
let mut ret: Vec<String> = Vec::new(); ret = Vec::new();
// wait for logs parse and zmq channel receive // wait for logs parse and zmq channel receive
//let mut received_ip = ipdatarx.recv(); //let mut received_ip = ipdatarx.recv();
let ipdata_wait = ipeventrx.recv(); let ipdata_wait = ipeventrx.recv();
let apimsg_wait = apirx.recv(); let apimsg_wait = apirx.recv();
let force_wait = sleep(tokio::time::Duration::from_millis(200)); let force_wait = sleep_ms(500);
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
@ -85,7 +86,7 @@ pub async fn run() {
msgtype: String::from("init"), msgtype: String::from("init"),
ipdata: ip_to_send, ipdata: ip_to_send,
}; };
send_to_ipbl_zmq(&zmqreqsocket, &ipe).await; send_to_ipbl_zmq(&zmqreqsocket, &ipe, &mut ret).await;
} }
continue continue
} }
@ -97,17 +98,16 @@ pub async fn run() {
if let Some(ip) = filtered_ip { if let Some(ip) = filtered_ip {
if received_ip.msgtype != "init" { if received_ip.msgtype != "init" {
println!("sending {} to ws and zmq", ip.ip); println!("sending {} to ws and zmq", ip.ip);
send_to_ipbl_ws(&ctx, &ip, &mut ret).await;
let event = IpEvent{ let event = IpEvent{
msgtype: String::from("add"), msgtype: String::from("add"),
ipdata: ip, ipdata: ip,
}; };
send_to_ipbl_zmq(&zmqreqsocket, &event).await; send_to_ipbl_ws(&ctx, &event, &mut ret).await;
send_to_ipbl_zmq(&zmqreqsocket, &event, &mut ret).await;
} }
} }
} }
_val = apimsg_wait => { _val = apimsg_wait => {}
}
_val = force_wait => {} _val = force_wait => {}
}; };

View File

@ -33,7 +33,7 @@ pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
list.len() list.len()
} }
pub async fn _sleep_ms(ms: u64) { pub async fn sleep_ms(ms: u64) {
sleep(Duration::from_millis(ms)).await; sleep(Duration::from_millis(ms)).await;
} }

View File

@ -1,13 +1,13 @@
use crate::config::Context; use crate::config::Context;
use crate::ip::IpData; use crate::ip::{IpData, IpEvent};
use crate::utils::sleep_s; use crate::utils::sleep_s;
use reqwest::Error as ReqError; use reqwest::Error as ReqError;
pub async fn send_to_ipbl_ws(ctx: &Context, ip: &IpData, ret: &mut Vec<String>) { pub async fn send_to_ipbl_ws(ctx: &Context, ip: &IpEvent, ret: &mut Vec<String>) {
ret.push(format!("host: {hostname}", hostname = ctx.hostname)); ret.push(format!("host: {hostname}", hostname = ctx.hostname));
loop { loop {
match push_ip(&ctx, &ip, ret).await { match push_ip(&ctx, &ip.ipdata, ret).await {
Ok(_) => { Ok(_) => {
break; break;
} }

View File

@ -84,7 +84,7 @@ async fn listenpubsub(ctx: &Arc<RwLock<Context>>, txpubsub: Sender<IpEvent>, soc
}); });
} }
pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpEvent) { pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpEvent, _ret: &mut Vec<String>) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match reqsocket.send(&msg, 0) { match reqsocket.send(&msg, 0) {
Ok(_) => {} Ok(_) => {}