update tcp api server

This commit is contained in:
Paul 2023-01-08 21:16:06 +01:00
parent 5375446303
commit dda5f09831
8 changed files with 144 additions and 107 deletions

View File

@ -45,6 +45,8 @@ Options:
- ✅ Handle zeromq data transfer - ✅ Handle zeromq data transfer
- ✅ Code optimizations (WIP) - ✅ Code optimizations (WIP)
- ✅ Error handing when fetching config - ✅ Error handing when fetching config
- ❌ Local bound tcp api socket
- ❌ ZMQ -> MQTT/Websocket ?
### Notes ### Notes

View File

@ -1,13 +1,13 @@
use crate::config::Context; use crate::config::Context;
use serde_json;
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::TcpSocket; use tokio::net::TcpSocket;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex; use tokio::sync::Mutex;
pub async fn apiserver(ctxarc: &Arc<Mutex<Context>>, apitx: Sender<String>) -> io::Result<()> { pub async fn apiserver(ctxarc: &Arc<Mutex<Context>>) -> io::Result<()> {
let ctxclone = Arc::clone(ctxarc); let ctxclone = Arc::clone(ctxarc);
let ctx = ctxclone.lock().await; let ctx = ctxclone.lock().await;
let addr = ctx.cfg.api.parse().unwrap(); let addr = ctx.cfg.api.parse().unwrap();
@ -15,24 +15,41 @@ pub async fn apiserver(ctxarc: &Arc<Mutex<Context>>, apitx: Sender<String>) -> i
let socket = TcpSocket::new_v4().unwrap(); let socket = TcpSocket::new_v4().unwrap();
socket.bind(addr).unwrap(); socket.bind(addr).unwrap();
socket.set_reuseaddr(true).unwrap();
let listener = socket.listen(1024).unwrap(); let listener = socket.listen(1024).unwrap();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
//apitx.send(String::from("")).await.unwrap();
match listener.accept().await { match listener.accept().await {
Ok((mut socket, addr)) => { Ok((stream, _addr)) => {
//let mut buf = [0; 1024];
let data;
{
let ctx = ctxclone.lock().await; let ctx = ctxclone.lock().await;
let (reader, mut writer) = socket.split(); data = serde_json::to_string(&ctx.blocklist);
apitx.send(String::from("")).await.unwrap(); }
let msg = format!("{:?}", ctx.blocklist.len());
writer match data {
.write_all(format!("{msg}\r\n").as_bytes()) Ok(dt) => {
.await let (_reader, mut writer) = stream.into_split();
.unwrap(); match writer.write_all(format!("{dt}").as_bytes()).await {
writer.shutdown().await.unwrap(); Ok(a) => {
socket.shutdown().await.unwrap(); println!("{a:?}");
}
Err(err) => {
println!("{err}");
}
}
}
Err(err) => {
println!("unable to serialize data: {err}");
}
}
}
Err(err) => {
println!("couldn't get client: {}", err)
} }
Err(e) => println!("couldn't get client: {:?}", e),
} }
} }
}); });

View File

@ -89,7 +89,7 @@ impl Context {
} }
Err(err) => { Err(err) => {
println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs"); println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs");
sleep_s(CONFIG_RETRY); sleep_s(CONFIG_RETRY).await;
} }
} }
} }
@ -207,6 +207,13 @@ impl Context {
// nightly, future use // nightly, future use
//let drained: HashMap<String,IpData> = ctx.blocklist.drain_filter(|k,v| v.parse_date() < mindate) //let drained: HashMap<String,IpData> = ctx.blocklist.drain_filter(|k,v| v.parse_date() < mindate)
for (ip, blocked) in self.blocklist.clone().iter() { for (ip, blocked) in self.blocklist.clone().iter() {
match self.cfg.sets.get(&blocked.ipdata.src) {
Some(set) => {
let mut block = self.blocklist.get_mut(ip).unwrap();
block.blocktime = set.blocktime.clone();
}
None => {}
}
let mindate = now - Duration::minutes(blocked.blocktime); let mindate = now - Duration::minutes(blocked.blocktime);
if blocked.starttime < mindate { if blocked.starttime < mindate {
self.blocklist.remove(&ip.clone()).unwrap(); self.blocklist.remove(&ip.clone()).unwrap();

View File

@ -1,15 +1,13 @@
use crate::ip::IpData; use crate::ip::IpData;
use crate::ipblc::PKG_NAME;
use nftnl::{nft_expr, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table}; use nftnl::{nft_expr, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table};
use std::{ffi::CString, io::Error, net::Ipv4Addr}; use std::{ffi::CString, io::Error, net::Ipv4Addr};
pub fn init(tablename: &String) -> (Batch, Table) { pub fn fwinit() -> (Batch, Table) {
let mut batch = Batch::new(); let mut batch = Batch::new();
let table = Table::new( let table = Table::new(&CString::new(PKG_NAME).unwrap(), ProtoFamily::Ipv4);
&CString::new(tablename.as_str()).unwrap(),
ProtoFamily::Ipv4,
);
batch.add(&table, nftnl::MsgType::Add); batch.add(&table, nftnl::MsgType::Add);
batch.add(&table, nftnl::MsgType::Del); batch.add(&table, nftnl::MsgType::Del);
@ -18,18 +16,17 @@ pub fn init(tablename: &String) -> (Batch, Table) {
(batch, table) (batch, table)
} }
pub fn block( pub fn fwblock(
tablename: &String,
ips_add: &Vec<IpData>, ips_add: &Vec<IpData>,
ret: &mut Vec<String>, ret: &mut Vec<String>,
fwlen: &mut usize, fwlen: &mut usize,
) -> std::result::Result<(), Error> { ) -> std::result::Result<(), Error> {
// convert chain // convert chain
let ips_add = convert(ips_add); let ips_add = convert(ips_add);
let (mut batch, table) = init(tablename); let (mut batch, table) = fwinit();
// build chain // build chain
let mut chain = Chain::new(&CString::new(tablename.as_str()).unwrap(), &table); let mut chain = Chain::new(&CString::new(PKG_NAME).unwrap(), &table);
chain.set_hook(nftnl::Hook::In, 1); chain.set_hook(nftnl::Hook::In, 1);
chain.set_policy(nftnl::Policy::Accept); chain.set_policy(nftnl::Policy::Accept);

View File

@ -1,6 +1,6 @@
use crate::api::apiserver; use crate::api::apiserver;
use crate::config::{Context, GIT_VERSION}; use crate::config::{Context, GIT_VERSION};
use crate::fw::{block, init}; use crate::fw::{fwblock, fwinit};
use crate::ip::{filter, IpData}; use crate::ip::{filter, IpData};
use crate::utils::read_lines; use crate::utils::read_lines;
use crate::ws::send_to_ipbl_ws; use crate::ws::send_to_ipbl_ws;
@ -15,36 +15,38 @@ use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex; use tokio::sync::Mutex;
pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
const BL_CHAN_SIZE: usize = 32; const BL_CHAN_SIZE: usize = 32;
const ZMQ_CHAN_SIZE: usize = 64; const ZMQ_CHAN_SIZE: usize = 64;
const API_CHAN_SIZE: usize = 64; const API_CHAN_SIZE: usize = 64;
pub async fn run() { pub async fn run() {
let ctx = Arc::new(Mutex::new(Context::new().await)); let ctxarc = Arc::new(Mutex::new(Context::new().await));
println!(
"Launching {}, version {}",
env!("CARGO_PKG_NAME"),
format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION)
);
let (apitx, mut apirx): (Sender<String>, Receiver<String>) = channel(API_CHAN_SIZE); let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
println!("Launching {}, version {}", PKG_NAME, pkgversion);
let (_apitx, mut apirx): (Sender<String>, Receiver<String>) = channel(API_CHAN_SIZE);
//let tcpsocket = apiinit(&ctx, &apitx).await; //let tcpsocket = apiinit(&ctx, &apitx).await;
apiserver(&ctx, apitx).await.unwrap(); let ctxclone = Arc::clone(&ctxarc);
apiserver(&ctxclone).await.unwrap();
// initialize the firewall table // initialize the firewall table
init(&env!("CARGO_PKG_NAME").to_string()); fwinit();
let mut fwlen: usize = 0; let mut fwlen: usize = 0;
// initialize zeromq sockets // initialize zeromq sockets
let (ipdatatx, mut ipdatarx): (Sender<IpData>, Receiver<IpData>) = channel(ZMQ_CHAN_SIZE); let (ipdatatx, mut ipdatarx): (Sender<IpData>, Receiver<IpData>) = channel(ZMQ_CHAN_SIZE);
let zmqreqsocket = zmqinit(&ctx, &ipdatatx).await; let zmqreqsocket = zmqinit(&ctxclone, &ipdatatx).await;
let mut blrx = watchfiles(&ctx).await; let mut blrx = watchfiles(&ctxclone).await;
let ctxarc = Arc::clone(&ctx); let ctxclone = Arc::clone(&ctxarc);
tokio::spawn(async move { tokio::spawn(async move {
compare_files_changes(&ctxarc, &mut blrx, &ipdatatx).await; compare_files_changes(&ctxclone, &mut blrx, &ipdatatx).await;
}); });
let mut ip_init = IpData { let mut ip_init = IpData {
@ -58,22 +60,19 @@ pub async fn run() {
loop { loop {
let mut ret: Vec<String> = Vec::new(); let mut ret: Vec<String> = Vec::new();
let begin: DateTime<Local> = Local::now().trunc_subsecs(0);
// wait for logs parse and zmq channel receive // wait for logs parse and zmq channel receive
//let mut received_ip = ipdatarx.recv(); //let mut received_ip = ipdatarx.recv();
let r = ipdatarx.recv(); let ipdata_wait = ipdatarx.recv();
let apimsg_wait = apirx.recv();
let apimsg = apirx.recv(); let ctxclone = Arc::clone(&ctxarc);
let sl = tokio::time::sleep(tokio::time::Duration::from_millis(100));
tokio::select! { tokio::select! {
val = r => { val = ipdata_wait => {
let mut received_ip = val.unwrap(); let mut received_ip = val.unwrap();
// lock the context mutex
let ctxarc = Arc::clone(&ctx); let mut ctx = ctxclone.lock().await;
let mut ctx = ctxarc.lock().await;
if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() { if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() {
for ip_to_send in &mut ctx.get_blocklist_toblock().await { for ip_to_send in &mut ctx.get_blocklist_toblock().await {
@ -85,62 +84,64 @@ pub async fn run() {
// refresh context blocklist // refresh context blocklist
let filtered_ip = ctx.update_blocklist(&mut received_ip).await; let filtered_ip = ctx.update_blocklist(&mut received_ip).await;
ctx.gc_blocklist().await;
// send ip list to ws and zmq sockets // send ip list to ws and zmq sockets
if let Some(mut ip) = filtered_ip { if let Some(mut ip) = filtered_ip {
send_to_ipbl_ws(&ctx, &mut ip, &mut ret).await; send_to_ipbl_ws(&ctx, &mut ip, &mut ret).await;
send_to_ipbl_zmq(&zmqreqsocket, &mut ip).await; send_to_ipbl_zmq(&zmqreqsocket, &mut ip).await;
} }
}
_val = apimsg_wait => {
}
};
let toblock;
{
let mut ctx = ctxarc.lock().await;
ctx.gc_blocklist().await;
toblock = ctx.get_blocklist_toblock().await;
}
// apply firewall blocking // apply firewall blocking
block( fwblock(&toblock, &mut ret, &mut fwlen).unwrap();
&env!("CARGO_PKG_NAME").to_string(),
&ctx.get_blocklist_toblock().await,
&mut ret,
&mut fwlen,
)
.unwrap();
// log lines // log lines
if ret.len() > 0 { if ret.len() > 0 {
println!("{ret}", ret = ret.join(", ")); println!("{ret}", ret = ret.join(", "));
} }
let end: DateTime<Local> = Local::now().trunc_subsecs(0); {
if (end - begin) > Duration::seconds(5) { let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - last_cfg_reload) > Duration::seconds(5) {
// reload configuration from the server // reload configuration from the server
let mut ctx = ctxclone.lock().await;
match ctx.load().await { match ctx.load().await {
Ok(_) => {} Ok(_) => {
last_cfg_reload = Local::now().trunc_subsecs(0);
drop(ctx);
}
Err(err) => { Err(err) => {
println!("error loading config: {err}"); println!("error loading config: {err}");
} }
} }
}; };
} }
_val = apimsg => {
continue;
}
_val = sl => {
continue;
}
};
} }
} }
async fn watchfiles(ctx: &Arc<Mutex<Context>>) -> Receiver<FileEvent> { async fn watchfiles(ctxarc: &Arc<Mutex<Context>>) -> Receiver<FileEvent> {
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE); let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);
let ctx = Arc::clone(ctx); let ctxclone = Arc::clone(ctxarc);
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
let events: Vec<InotifyEvent>; let events;
let instance;
{ {
let c = ctx.lock().await; let ctx = ctxclone.lock().await;
let instance = c.instance.clone(); instance = ctx.instance.clone();
drop(c);
events = instance.read_events().unwrap();
} }
events = instance.read_events().unwrap();
for inevent in events { for inevent in events {
let date: DateTime<Local> = Local::now().trunc_subsecs(0); let date: DateTime<Local> = Local::now().trunc_subsecs(0);
bltx.send(FileEvent { inevent, date }).await.unwrap(); bltx.send(FileEvent { inevent, date }).await.unwrap();
@ -163,7 +164,7 @@ async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, b
} }
async fn compare_files_changes( async fn compare_files_changes(
ctx: &Arc<Mutex<Context>>, ctxarc: &Arc<Mutex<Context>>,
inrx: &mut Receiver<FileEvent>, inrx: &mut Receiver<FileEvent>,
ipdatatx: &Sender<IpData>, ipdatatx: &Sender<IpData>,
) { ) {
@ -172,14 +173,20 @@ async fn compare_files_changes(
let modfiles = inrx.recv().await.unwrap(); let modfiles = inrx.recv().await.unwrap();
let mut iplist: Vec<IpData> = vec![]; let mut iplist: Vec<IpData> = vec![];
let mut ctx = ctx.lock().await; let sask;
let sas;
{
let ctx = ctxarc.lock().await;
sas = ctx.clone().sas;
sask = sas.keys();
tnets = ctx.cfg.build_trustnets(); tnets = ctx.cfg.build_trustnets();
}
match modfiles.inevent.name { match modfiles.inevent.name {
Some(name) => { Some(name) => {
let filename = name.to_str().unwrap(); let filename = name.to_str().unwrap();
for sak in &mut ctx.clone().sas.keys() { for sak in sask {
let sa = &mut ctx.sas.get_mut(sak).unwrap(); let sa = sas.get(sak).unwrap();
if modfiles.inevent.wd == sa.wd { if modfiles.inevent.wd == sa.wd {
let handle: String; let handle: String;
if sa.filename.as_str() == "" { if sa.filename.as_str() == "" {
@ -190,8 +197,13 @@ async fn compare_files_changes(
continue; continue;
} }
let (filesize, sizechanged) = let (filesize, sizechanged);
{
let mut ctx = ctxarc.lock().await;
let sa = ctx.sas.get_mut(sak).unwrap();
(filesize, sizechanged) =
get_last_file_size(&mut sa.watchedfiles, &handle).await; get_last_file_size(&mut sa.watchedfiles, &handle).await;
}
if !sizechanged { if !sizechanged {
continue; continue;

View File

@ -5,7 +5,7 @@ use std::boxed::Box;
use std::fs::File; use std::fs::File;
use std::io::*; use std::io::*;
use std::path::Path; use std::path::Path;
use std::time::Duration; use tokio::time::{sleep, Duration};
lazy_static! { lazy_static! {
static ref R_FILE_GZIP: Regex = Regex::new(r".*\.gz.*").unwrap(); static ref R_FILE_GZIP: Regex = Regex::new(r".*\.gz.*").unwrap();
@ -33,12 +33,12 @@ pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
list.len() list.len()
} }
pub fn _sleep_ms(ms: u64) { pub async fn sleep_ms(ms: u64) {
std::thread::sleep(Duration::from_millis(ms)); sleep(Duration::from_millis(ms)).await;
} }
pub fn sleep_s(s: u64) { pub async fn sleep_s(s: u64) {
std::thread::sleep(Duration::from_secs(s)); sleep(Duration::from_secs(s)).await;
} }
pub fn gethostname(show_fqdn: bool) -> String { pub fn gethostname(show_fqdn: bool) -> String {

View File

@ -11,7 +11,7 @@ pub async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec<Strin
} }
Err(err) => { Err(err) => {
println!("{err}"); println!("{err}");
sleep_s(1); sleep_s(1).await;
} }
}; };
} }

View File

@ -41,15 +41,17 @@ pub async fn zmqinit(ctx: &Arc<Mutex<Context>>, ipdatatx: &Sender<IpData>) -> zm
} }
async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) { async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
let prefix;
{
let ctx = ctx.lock().await; let ctx = ctx.lock().await;
let prefix = format!( prefix = format!(
"{sub} ", "{sub} ",
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
); );
socket socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes()) .set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription"); .expect("failed setting subscription");
drop(ctx); }
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {