implemented first tcp api server

This commit is contained in:
Paul 2023-01-08 14:09:13 +01:00
parent 06cb6f72be
commit 5375446303
6 changed files with 227 additions and 142 deletions

40
src/api.rs Normal file
View File

@ -0,0 +1,40 @@
use crate::config::Context;
use std::io;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpSocket;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
pub async fn apiserver(ctxarc: &Arc<Mutex<Context>>, apitx: Sender<String>) -> io::Result<()> {
let ctxclone = Arc::clone(ctxarc);
let ctx = ctxclone.lock().await;
let addr = ctx.cfg.api.parse().unwrap();
drop(ctx);
let socket = TcpSocket::new_v4().unwrap();
socket.bind(addr).unwrap();
let listener = socket.listen(1024).unwrap();
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((mut socket, addr)) => {
let ctx = ctxclone.lock().await;
let (reader, mut writer) = socket.split();
apitx.send(String::from("")).await.unwrap();
let msg = format!("{:?}", ctx.blocklist.len());
writer
.write_all(format!("{msg}\r\n").as_bytes())
.await
.unwrap();
writer.shutdown().await.unwrap();
socket.shutdown().await.unwrap();
}
Err(e) => println!("couldn't get client: {:?}", e),
}
}
});
Ok(())
}

View File

@ -49,7 +49,7 @@ pub struct Flags {
}
impl Context {
pub async fn new<'a>() -> Self {
pub async fn new() -> Self {
// Get flags
let argp: ArgMatches = Context::argparse();
//let debug: bool = argp.contains_id("debug");
@ -96,7 +96,7 @@ impl Context {
ctx
}
pub fn argparse<'a>() -> ArgMatches {
pub fn argparse() -> ArgMatches {
Command::new(env!("CARGO_PKG_NAME"))
.version(format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION))
.author(env!("CARGO_PKG_AUTHORS"))
@ -274,6 +274,7 @@ pub struct Config {
#[serde(skip_serializing)]
pub trustnets: Vec<String>,
pub zmq: HashMap<String, ZMQ>,
pub api: String,
}
impl Config {
@ -333,7 +334,8 @@ impl Config {
hostname: MASTERSERVER.to_string(),
port: 9998,
subscription: String::new(),
})])
})]),
api: String::from("127.0.0.1:8099")
}
}

View File

@ -1,8 +1,10 @@
use crate::api::apiserver;
use crate::config::{Context, GIT_VERSION};
use crate::fw::{block, init};
use crate::ip::{filter, push_ip, IpData};
use crate::utils::{gethostname, read_lines, sleep_s};
use crate::zmqcom::zconnect;
use crate::ip::{filter, IpData};
use crate::utils::read_lines;
use crate::ws::send_to_ipbl_ws;
use crate::zmqcom::{send_to_ipbl_zmq, zmqinit};
use chrono::prelude::*;
use chrono::prelude::{DateTime, Local};
@ -15,6 +17,7 @@ use tokio::sync::Mutex;
const BL_CHAN_SIZE: usize = 32;
const ZMQ_CHAN_SIZE: usize = 64;
const API_CHAN_SIZE: usize = 64;
pub async fn run() {
let ctx = Arc::new(Mutex::new(Context::new().await));
@ -24,27 +27,18 @@ pub async fn run() {
format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION)
);
let (ipdatatx, mut ipdatarx): (Sender<IpData>, Receiver<IpData>) = channel(ZMQ_CHAN_SIZE);
let (apitx, mut apirx): (Sender<String>, Receiver<String>) = channel(API_CHAN_SIZE);
//let tcpsocket = apiinit(&ctx, &apitx).await;
apiserver(&ctx, apitx).await.unwrap();
// initialize the firewall table
init(&env!("CARGO_PKG_NAME").to_string());
let mut fwlen: usize = 0;
// initialize zeromq sockets
let reqsocket;
let subsocket;
{
let ctxarc = Arc::clone(&ctx);
let zmqctx = ctxarc.lock().await;
reqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ)
.await
.unwrap();
subsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB)
.await
.unwrap();
}
listenpubsub(&ctx, ipdatatx.clone(), subsocket).await;
let (ipdatatx, mut ipdatarx): (Sender<IpData>, Receiver<IpData>) = channel(ZMQ_CHAN_SIZE);
let zmqreqsocket = zmqinit(&ctx, &ipdatatx).await;
let mut blrx = watchfiles(&ctx).await;
@ -60,61 +54,77 @@ pub async fn run() {
hostname: "".to_string(),
mode: "init".to_string(),
};
send_to_ipbl_zmq(&reqsocket, &mut ip_init).await;
send_to_ipbl_zmq(&zmqreqsocket, &mut ip_init).await;
loop {
let mut ret: Vec<String> = Vec::new();
let begin: DateTime<Local> = Local::now().trunc_subsecs(0);
// wait for logs parse and zmq channel receive
let mut received_ip = ipdatarx.recv().await.unwrap();
//let mut received_ip = ipdatarx.recv();
let r = ipdatarx.recv();
// lock the context mutex
let ctxarc = Arc::clone(&ctx);
let mut ctx = ctxarc.lock().await;
let apimsg = apirx.recv();
if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() {
for ip_to_send in &mut ctx.get_blocklist_toblock().await {
ip_to_send.mode = "init".to_string();
send_to_ipbl_zmq(&reqsocket, ip_to_send).await;
}
continue;
}
let sl = tokio::time::sleep(tokio::time::Duration::from_millis(100));
// refresh context blocklist
let filtered_ip = ctx.update_blocklist(&mut received_ip).await;
ctx.gc_blocklist().await;
tokio::select! {
val = r => {
let mut received_ip = val.unwrap();
// lock the context mutex
let ctxarc = Arc::clone(&ctx);
let mut ctx = ctxarc.lock().await;
// send ip list to ws and zmq sockets
if let Some(mut ip) = filtered_ip {
send_to_ipbl_ws(&ctx, &mut ip, &mut ret).await;
send_to_ipbl_zmq(&reqsocket, &mut ip).await;
}
// apply firewall blocking
block(
&env!("CARGO_PKG_NAME").to_string(),
&ctx.get_blocklist_toblock().await,
&mut ret,
&mut fwlen,
)
.unwrap();
// log lines
if ret.len() > 0 {
println!("{ret}", ret = ret.join(", "));
}
let end: DateTime<Local> = Local::now().trunc_subsecs(0);
if (end - begin) > Duration::seconds(5) {
// reload configuration from the server
match ctx.load().await {
Ok(_) => {}
Err(err) => {
println!("error loading config: {err}");
if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() {
for ip_to_send in &mut ctx.get_blocklist_toblock().await {
ip_to_send.mode = "init".to_string();
send_to_ipbl_zmq(&zmqreqsocket, ip_to_send).await;
}
continue;
}
// refresh context blocklist
let filtered_ip = ctx.update_blocklist(&mut received_ip).await;
ctx.gc_blocklist().await;
// send ip list to ws and zmq sockets
if let Some(mut ip) = filtered_ip {
send_to_ipbl_ws(&ctx, &mut ip, &mut ret).await;
send_to_ipbl_zmq(&zmqreqsocket, &mut ip).await;
}
// apply firewall blocking
block(
&env!("CARGO_PKG_NAME").to_string(),
&ctx.get_blocklist_toblock().await,
&mut ret,
&mut fwlen,
)
.unwrap();
// log lines
if ret.len() > 0 {
println!("{ret}", ret = ret.join(", "));
}
let end: DateTime<Local> = Local::now().trunc_subsecs(0);
if (end - begin) > Duration::seconds(5) {
// reload configuration from the server
match ctx.load().await {
Ok(_) => {}
Err(err) => {
println!("error loading config: {err}");
}
}
};
}
}
_val = apimsg => {
continue;
}
_val = sl => {
continue;
}
};
}
}
@ -222,79 +232,3 @@ impl std::fmt::Debug for FileEvent {
write!(f, "{ie:?}", ie = self.inevent)
}
}
async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &mut IpData) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match reqsocket.send(&msg, 0) {
Ok(_) => {}
Err(e) => {
println!("{e:?}")
}
};
match reqsocket.recv_string(0) {
Ok(o) => match o {
Ok(_) => {}
Err(ee) => {
println!("{ee:?}")
}
},
Err(e) => {
println!("{e:?}")
}
};
}
async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec<String>) {
ret.push(format!("host: {hostname}", hostname = ctx.hostname));
loop {
match push_ip(&ctx, &ip, ret).await {
Ok(_) => {
break;
}
Err(err) => {
println!("{err}");
sleep_s(1);
}
};
}
}
async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
let ctx = ctx.lock().await;
let prefix = format!(
"{sub} ",
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
);
socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription");
drop(ctx);
tokio::spawn(async move {
loop {
let msgs: Option<String> = match socket.recv_string(0) {
Ok(s) => match s {
Ok(ss) => Some(ss),
Err(e) => {
println!("{e:?}");
None
}
},
Err(e) => {
println!("{e:?}");
None
}
};
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
let tosend: IpData = serde_json::from_str(msg).unwrap();
if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() {
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
});
}

View File

@ -1,8 +1,10 @@
mod api;
mod config;
mod fw;
mod ip;
mod ipblc;
mod utils;
mod ws;
mod zmqcom;
#[tokio::main]

18
src/ws.rs Normal file
View File

@ -0,0 +1,18 @@
use crate::config::Context;
use crate::ip::{push_ip, IpData};
use crate::utils::sleep_s;
pub async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec<String>) {
ret.push(format!("host: {hostname}", hostname = ctx.hostname));
loop {
match push_ip(&ctx, &ip, ret).await {
Ok(_) => {
break;
}
Err(err) => {
println!("{err}");
sleep_s(1);
}
};
}
}

View File

@ -1,8 +1,17 @@
use crate::config::ZMQ;
use crate::config::{Context, ZMQ};
use crate::ip::IpData;
use crate::utils::gethostname;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
const ZMQPROTO: &str = "tcp";
pub async fn zconnect(zmqcfg: &ZMQ, zmqtype: zmq::SocketType) -> Result<zmq::Socket, zmq::Error> {
pub async fn zconnect<'a>(
zmqcfg: &ZMQ,
zmqtype: zmq::SocketType,
) -> Result<zmq::Socket, zmq::Error> {
let zctx = zmq::Context::new();
let zmqhost = &zmqcfg.hostname;
let zmqport = &zmqcfg.port;
@ -11,3 +20,83 @@ pub async fn zconnect(zmqcfg: &ZMQ, zmqtype: zmq::SocketType) -> Result<zmq::Soc
socket.connect(&connectstring.as_str())?;
Ok(socket)
}
pub async fn zmqinit(ctx: &Arc<Mutex<Context>>, ipdatatx: &Sender<IpData>) -> zmq::Socket {
let ctxarc = Arc::clone(&ctx);
let zmqreqsocket;
let zmqsubsocket;
{
let zmqctx = ctxarc.lock().await;
zmqreqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ)
.await
.unwrap();
zmqsubsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB)
.await
.unwrap();
}
listenpubsub(&ctx, ipdatatx.clone(), zmqsubsocket).await;
return zmqreqsocket;
}
async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
let ctx = ctx.lock().await;
let prefix = format!(
"{sub} ",
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
);
socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription");
drop(ctx);
tokio::spawn(async move {
loop {
let msgs: Option<String> = match socket.recv_string(0) {
Ok(s) => match s {
Ok(ss) => Some(ss),
Err(e) => {
println!("{e:?}");
None
}
},
Err(e) => {
println!("{e:?}");
None
}
};
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
let tosend: IpData = serde_json::from_str(msg).unwrap();
if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() {
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
});
}
pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &mut IpData) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match reqsocket.send(&msg, 0) {
Ok(_) => {}
Err(e) => {
println!("{e:?}")
}
};
match reqsocket.recv_string(0) {
Ok(o) => match o {
Ok(_) => {}
Err(ee) => {
println!("{ee:?}")
}
},
Err(e) => {
println!("{e:?}")
}
};
}