diff --git a/src/api.rs b/src/api.rs new file mode 100644 index 0000000..7234eb4 --- /dev/null +++ b/src/api.rs @@ -0,0 +1,40 @@ +use crate::config::Context; + +use std::io; +use std::sync::Arc; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpSocket; +use tokio::sync::mpsc::Sender; +use tokio::sync::Mutex; + +pub async fn apiserver(ctxarc: &Arc>, apitx: Sender) -> io::Result<()> { + let ctxclone = Arc::clone(ctxarc); + let ctx = ctxclone.lock().await; + let addr = ctx.cfg.api.parse().unwrap(); + drop(ctx); + + let socket = TcpSocket::new_v4().unwrap(); + socket.bind(addr).unwrap(); + let listener = socket.listen(1024).unwrap(); + + tokio::spawn(async move { + loop { + match listener.accept().await { + Ok((mut socket, addr)) => { + let ctx = ctxclone.lock().await; + let (reader, mut writer) = socket.split(); + apitx.send(String::from("")).await.unwrap(); + let msg = format!("{:?}", ctx.blocklist.len()); + writer + .write_all(format!("{msg}\r\n").as_bytes()) + .await + .unwrap(); + writer.shutdown().await.unwrap(); + socket.shutdown().await.unwrap(); + } + Err(e) => println!("couldn't get client: {:?}", e), + } + } + }); + Ok(()) +} diff --git a/src/config.rs b/src/config.rs index c351594..026d543 100644 --- a/src/config.rs +++ b/src/config.rs @@ -49,7 +49,7 @@ pub struct Flags { } impl Context { - pub async fn new<'a>() -> Self { + pub async fn new() -> Self { // Get flags let argp: ArgMatches = Context::argparse(); //let debug: bool = argp.contains_id("debug"); @@ -96,7 +96,7 @@ impl Context { ctx } - pub fn argparse<'a>() -> ArgMatches { + pub fn argparse() -> ArgMatches { Command::new(env!("CARGO_PKG_NAME")) .version(format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION)) .author(env!("CARGO_PKG_AUTHORS")) @@ -274,6 +274,7 @@ pub struct Config { #[serde(skip_serializing)] pub trustnets: Vec, pub zmq: HashMap, + pub api: String, } impl Config { @@ -333,7 +334,8 @@ impl Config { hostname: MASTERSERVER.to_string(), port: 9998, subscription: String::new(), - })]) + })]), + api: String::from("127.0.0.1:8099") } } diff --git a/src/ipblc.rs b/src/ipblc.rs index 055bc6b..c229141 100644 --- a/src/ipblc.rs +++ b/src/ipblc.rs @@ -1,8 +1,10 @@ +use crate::api::apiserver; use crate::config::{Context, GIT_VERSION}; use crate::fw::{block, init}; -use crate::ip::{filter, push_ip, IpData}; -use crate::utils::{gethostname, read_lines, sleep_s}; -use crate::zmqcom::zconnect; +use crate::ip::{filter, IpData}; +use crate::utils::read_lines; +use crate::ws::send_to_ipbl_ws; +use crate::zmqcom::{send_to_ipbl_zmq, zmqinit}; use chrono::prelude::*; use chrono::prelude::{DateTime, Local}; @@ -15,6 +17,7 @@ use tokio::sync::Mutex; const BL_CHAN_SIZE: usize = 32; const ZMQ_CHAN_SIZE: usize = 64; +const API_CHAN_SIZE: usize = 64; pub async fn run() { let ctx = Arc::new(Mutex::new(Context::new().await)); @@ -24,27 +27,18 @@ pub async fn run() { format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION) ); - let (ipdatatx, mut ipdatarx): (Sender, Receiver) = channel(ZMQ_CHAN_SIZE); + let (apitx, mut apirx): (Sender, Receiver) = channel(API_CHAN_SIZE); + //let tcpsocket = apiinit(&ctx, &apitx).await; + + apiserver(&ctx, apitx).await.unwrap(); // initialize the firewall table init(&env!("CARGO_PKG_NAME").to_string()); let mut fwlen: usize = 0; // initialize zeromq sockets - let reqsocket; - let subsocket; - { - let ctxarc = Arc::clone(&ctx); - let zmqctx = ctxarc.lock().await; - reqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ) - .await - .unwrap(); - subsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB) - .await - .unwrap(); - } - - listenpubsub(&ctx, ipdatatx.clone(), subsocket).await; + let (ipdatatx, mut ipdatarx): (Sender, Receiver) = channel(ZMQ_CHAN_SIZE); + let zmqreqsocket = zmqinit(&ctx, &ipdatatx).await; let mut blrx = watchfiles(&ctx).await; @@ -60,61 +54,77 @@ pub async fn run() { hostname: "".to_string(), mode: "init".to_string(), }; - send_to_ipbl_zmq(&reqsocket, &mut ip_init).await; + send_to_ipbl_zmq(&zmqreqsocket, &mut ip_init).await; loop { let mut ret: Vec = Vec::new(); let begin: DateTime = Local::now().trunc_subsecs(0); // wait for logs parse and zmq channel receive - let mut received_ip = ipdatarx.recv().await.unwrap(); + //let mut received_ip = ipdatarx.recv(); + let r = ipdatarx.recv(); - // lock the context mutex - let ctxarc = Arc::clone(&ctx); - let mut ctx = ctxarc.lock().await; + let apimsg = apirx.recv(); - if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() { - for ip_to_send in &mut ctx.get_blocklist_toblock().await { - ip_to_send.mode = "init".to_string(); - send_to_ipbl_zmq(&reqsocket, ip_to_send).await; - } - continue; - } + let sl = tokio::time::sleep(tokio::time::Duration::from_millis(100)); - // refresh context blocklist - let filtered_ip = ctx.update_blocklist(&mut received_ip).await; - ctx.gc_blocklist().await; + tokio::select! { + val = r => { + let mut received_ip = val.unwrap(); + // lock the context mutex + let ctxarc = Arc::clone(&ctx); + let mut ctx = ctxarc.lock().await; - // send ip list to ws and zmq sockets - if let Some(mut ip) = filtered_ip { - send_to_ipbl_ws(&ctx, &mut ip, &mut ret).await; - send_to_ipbl_zmq(&reqsocket, &mut ip).await; - } - - // apply firewall blocking - block( - &env!("CARGO_PKG_NAME").to_string(), - &ctx.get_blocklist_toblock().await, - &mut ret, - &mut fwlen, - ) - .unwrap(); - - // log lines - if ret.len() > 0 { - println!("{ret}", ret = ret.join(", ")); - } - - let end: DateTime = Local::now().trunc_subsecs(0); - if (end - begin) > Duration::seconds(5) { - // reload configuration from the server - match ctx.load().await { - Ok(_) => {} - Err(err) => { - println!("error loading config: {err}"); + if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() { + for ip_to_send in &mut ctx.get_blocklist_toblock().await { + ip_to_send.mode = "init".to_string(); + send_to_ipbl_zmq(&zmqreqsocket, ip_to_send).await; + } + continue; } + + // refresh context blocklist + let filtered_ip = ctx.update_blocklist(&mut received_ip).await; + ctx.gc_blocklist().await; + + // send ip list to ws and zmq sockets + if let Some(mut ip) = filtered_ip { + send_to_ipbl_ws(&ctx, &mut ip, &mut ret).await; + send_to_ipbl_zmq(&zmqreqsocket, &mut ip).await; + } + + // apply firewall blocking + block( + &env!("CARGO_PKG_NAME").to_string(), + &ctx.get_blocklist_toblock().await, + &mut ret, + &mut fwlen, + ) + .unwrap(); + + // log lines + if ret.len() > 0 { + println!("{ret}", ret = ret.join(", ")); + } + + let end: DateTime = Local::now().trunc_subsecs(0); + if (end - begin) > Duration::seconds(5) { + // reload configuration from the server + match ctx.load().await { + Ok(_) => {} + Err(err) => { + println!("error loading config: {err}"); + } + } + }; } - } + _val = apimsg => { + continue; + } + _val = sl => { + continue; + } + }; } } @@ -222,79 +232,3 @@ impl std::fmt::Debug for FileEvent { write!(f, "{ie:?}", ie = self.inevent) } } - -async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &mut IpData) { - let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); - match reqsocket.send(&msg, 0) { - Ok(_) => {} - Err(e) => { - println!("{e:?}") - } - }; - match reqsocket.recv_string(0) { - Ok(o) => match o { - Ok(_) => {} - Err(ee) => { - println!("{ee:?}") - } - }, - Err(e) => { - println!("{e:?}") - } - }; -} - -async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec) { - ret.push(format!("host: {hostname}", hostname = ctx.hostname)); - loop { - match push_ip(&ctx, &ip, ret).await { - Ok(_) => { - break; - } - Err(err) => { - println!("{err}"); - sleep_s(1); - } - }; - } -} - -async fn listenpubsub(ctx: &Arc>, txpubsub: Sender, socket: zmq::Socket) { - let ctx = ctx.lock().await; - let prefix = format!( - "{sub} ", - sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription - ); - socket - .set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes()) - .expect("failed setting subscription"); - drop(ctx); - - tokio::spawn(async move { - loop { - let msgs: Option = match socket.recv_string(0) { - Ok(s) => match s { - Ok(ss) => Some(ss), - Err(e) => { - println!("{e:?}"); - None - } - }, - Err(e) => { - println!("{e:?}"); - None - } - }; - match msgs { - Some(ss) => { - let msg = ss.strip_prefix(prefix.as_str()).unwrap(); - let tosend: IpData = serde_json::from_str(msg).unwrap(); - if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() { - txpubsub.send(tosend).await.unwrap(); - } - } - None => {} - }; - } - }); -} diff --git a/src/main.rs b/src/main.rs index 30c6b8e..db2c7e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,10 @@ +mod api; mod config; mod fw; mod ip; mod ipblc; mod utils; +mod ws; mod zmqcom; #[tokio::main] diff --git a/src/ws.rs b/src/ws.rs new file mode 100644 index 0000000..1cd7bad --- /dev/null +++ b/src/ws.rs @@ -0,0 +1,18 @@ +use crate::config::Context; +use crate::ip::{push_ip, IpData}; +use crate::utils::sleep_s; + +pub async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec) { + ret.push(format!("host: {hostname}", hostname = ctx.hostname)); + loop { + match push_ip(&ctx, &ip, ret).await { + Ok(_) => { + break; + } + Err(err) => { + println!("{err}"); + sleep_s(1); + } + }; + } +} diff --git a/src/zmqcom.rs b/src/zmqcom.rs index 7733ca9..943f2ed 100644 --- a/src/zmqcom.rs +++ b/src/zmqcom.rs @@ -1,8 +1,17 @@ -use crate::config::ZMQ; +use crate::config::{Context, ZMQ}; +use crate::ip::IpData; +use crate::utils::gethostname; + +use std::sync::Arc; +use tokio::sync::mpsc::Sender; +use tokio::sync::Mutex; const ZMQPROTO: &str = "tcp"; -pub async fn zconnect(zmqcfg: &ZMQ, zmqtype: zmq::SocketType) -> Result { +pub async fn zconnect<'a>( + zmqcfg: &ZMQ, + zmqtype: zmq::SocketType, +) -> Result { let zctx = zmq::Context::new(); let zmqhost = &zmqcfg.hostname; let zmqport = &zmqcfg.port; @@ -11,3 +20,83 @@ pub async fn zconnect(zmqcfg: &ZMQ, zmqtype: zmq::SocketType) -> Result>, ipdatatx: &Sender) -> zmq::Socket { + let ctxarc = Arc::clone(&ctx); + + let zmqreqsocket; + let zmqsubsocket; + { + let zmqctx = ctxarc.lock().await; + zmqreqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ) + .await + .unwrap(); + zmqsubsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB) + .await + .unwrap(); + } + + listenpubsub(&ctx, ipdatatx.clone(), zmqsubsocket).await; + return zmqreqsocket; +} + +async fn listenpubsub(ctx: &Arc>, txpubsub: Sender, socket: zmq::Socket) { + let ctx = ctx.lock().await; + let prefix = format!( + "{sub} ", + sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription + ); + socket + .set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes()) + .expect("failed setting subscription"); + drop(ctx); + + tokio::spawn(async move { + loop { + let msgs: Option = match socket.recv_string(0) { + Ok(s) => match s { + Ok(ss) => Some(ss), + Err(e) => { + println!("{e:?}"); + None + } + }, + Err(e) => { + println!("{e:?}"); + None + } + }; + match msgs { + Some(ss) => { + let msg = ss.strip_prefix(prefix.as_str()).unwrap(); + let tosend: IpData = serde_json::from_str(msg).unwrap(); + if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() { + txpubsub.send(tosend).await.unwrap(); + } + } + None => {} + }; + } + }); +} + +pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &mut IpData) { + let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); + match reqsocket.send(&msg, 0) { + Ok(_) => {} + Err(e) => { + println!("{e:?}") + } + }; + match reqsocket.recv_string(0) { + Ok(o) => match o { + Ok(_) => {} + Err(ee) => { + println!("{ee:?}") + } + }, + Err(e) => { + println!("{e:?}") + } + }; +}