Merge branch 'develop-websocket'
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Paul 2023-06-23 10:44:53 +02:00
commit a9e18cfcdd
17 changed files with 1256 additions and 950 deletions

View File

@ -12,7 +12,7 @@ steps:
image: rust:1 image: rust:1
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev - apt-get install -y libnftnl-dev libmnl-dev
- cargo build --verbose --all - cargo build --verbose --all
- cargo test --verbose --all - cargo test --verbose --all
volumes: volumes:
@ -28,7 +28,7 @@ steps:
image: rust:1 image: rust:1
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev - apt-get install -y libnftnl-dev libmnl-dev
- cargo build --release --verbose --all - cargo build --release --verbose --all
- cd target/release - cd target/release
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc - tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc
@ -77,7 +77,7 @@ steps:
image: rust:1 image: rust:1
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev - apt-get install -y libnftnl-dev libmnl-dev
- cargo build --verbose --all - cargo build --verbose --all
- cargo test --verbose --all - cargo test --verbose --all
volumes: volumes:
@ -93,7 +93,7 @@ steps:
image: rust:1 image: rust:1
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev - apt-get install -y libnftnl-dev libmnl-dev
- cargo build --release --verbose --all - cargo build --release --verbose --all
- cd target/release - cd target/release
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc - tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc

921
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -10,32 +10,22 @@ repository = "https://git.paulbsd.com/paulbsd/ipblc"
[dependencies] [dependencies]
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.0", features = ["string"] } clap = { version = "4.2", features = ["string"] }
git-version = "0.3.5" git-version = "0.3"
ipnet = "2.7" ipnet = "2.7"
lazy_static = "1.4" lazy_static = "1.4"
mnl = "0.2" mnl = "0.2"
nftnl = "0.6" nftnl = "0.6"
nix = "0.26" nix = "0.26"
regex = "1.7" regex = "1.8"
reqwest = { version = "0.11", default-features = false, features = ["json","rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["json","rustls-tls"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
tokio = { version = "1.23", features = ["full"] } tokio = { version = "1.28", features = ["full", "sync"] }
zmq = "0.10" tungstenite = { version = "0.19", features = ["handshake","rustls-tls-native-roots"] }
# [target.aarch64-unknown-linux-gnu.dependencies] ## to optimize binary size (slow compile time)
# flate2 = { version = "1.0", features = ["cloudflare_zlib"] } #[profile.release]
#strip = true
# [target.aarch64-linux-android.dependencies] #lto = true
# flate2 = { version = "1.0", features = ["zlib"] } #opt-level = "z"
# [target.armv7-unknown-linux-gnueabihf.dependencies]
# flate2 = { version = "1.0", features = ["zlib"] }
# [target.x86_64-unknown-linux-gnu.dependencies]
# flate2 = { version = "1.0", features = ["cloudflare_zlib"] }
[profile.release]
debug = false
opt-level = 3

10
NOTES.md Normal file
View File

@ -0,0 +1,10 @@
# Notes
### Date formats
```
nginx: 2006-01-02T15:04:05+01:00
ssh: 2006-01-02T15:04:05.000000+01:00
openvpn: 2006-01-02 15:04:05
mail: 2006-01-02T15:04:05.000000+01:00
```

View File

@ -26,14 +26,15 @@ cargo build --release
### Usage ### Usage
``` ```
USAGE: ipblc is a tool that search and send attacking ip addresses to ipbl
ipblc [OPTIONS]
OPTIONS: Usage: ipblc [OPTIONS]
-d Enable debugging
-h, --help Print help information Options:
-s, --server <server> Sets a ipbl server [default: https://ipbl.paulbsd.com] -s, --server <server> Sets a http server [default: https://ipbl.paulbsd.com]
-V, --version Print version informatio -d Enable debugging
-h, --help Print help information
-V, --version Print version information
``` ```
### TODO ### TODO
@ -44,20 +45,19 @@ OPTIONS:
- ✅ Handle zeromq data transfer - ✅ Handle zeromq data transfer
- ✅ Code optimizations (WIP) - ✅ Code optimizations (WIP)
- ✅ Error handing when fetching config - ✅ Error handing when fetching config
- ✅ Local bound tcp api socket
- ✅ ZMQ -> Websocket
- ✅ Bug in RwLocks (agent often give up)
### Date formats
``` ### Notes
nginx: 2006-01-02T15:04:05+01:00
ssh: 2006-01-02T15:04:05.000000+01:00 See [here](NOTES.md)
openvpn: 2006-01-02 15:04:05
mail: 2006-01-02T15:04:05.000000+01:00
```
## License ## License
```text ```text
Copyright (c) 2021, 2022 PaulBSD Copyright (c) 2021, 2022, 2023 PaulBSD
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without Redistribution and use in source and binary forms, with or without

106
old/zmqcom.rs Normal file
View File

@ -0,0 +1,106 @@
use crate::config::{Context, ZMQCfg};
use crate::ip::IpEvent;
use crate::utils::gethostname;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
const ZMQPROTO: &str = "tcp";
pub async fn zconnect<'a>(
zmqcfg: &ZMQCfg,
zmqtype: zmq::SocketType,
) -> Result<zmq::Socket, zmq::Error> {
let zctx = zmq::Context::new();
let zmqhost = &zmqcfg.hostname;
let zmqport = &zmqcfg.port;
let socket = zctx.socket(zmqtype).unwrap();
let connectstring = format!("{ZMQPROTO}://{zmqhost}:{zmqport}");
socket.connect(&connectstring.as_str())?;
Ok(socket)
}
pub async fn zmqinit(ctx: &Arc<RwLock<Context>>, ipeventtx: &Sender<IpEvent>) -> zmq::Socket {
let ctxarc = Arc::clone(&ctx);
let zmqreqsocket;
let zmqsubsocket;
{
let zmqctx = ctxarc.read().await;
zmqreqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ)
.await
.unwrap();
zmqsubsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB)
.await
.unwrap();
}
listenpubsub(&ctx, ipeventtx.clone(), zmqsubsocket).await;
return zmqreqsocket;
}
async fn listenpubsub(ctx: &Arc<RwLock<Context>>, txpubsub: Sender<IpEvent>, socket: zmq::Socket) {
let prefix;
{
let ctx = ctx.read().await;
prefix = format!(
"{sub} ",
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
);
socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription");
}
tokio::spawn(async move {
loop {
let msgs: Option<String> = match socket.recv_string(0) {
Ok(s) => match s {
Ok(ss) => Some(ss),
Err(e) => {
println!("{e:?}");
None
}
},
Err(e) => {
println!("{e:?}");
None
}
};
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
let tosend: IpEvent = serde_json::from_str(msg).unwrap();
if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
});
}
pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpEvent, _ret: &mut Vec<String>) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match reqsocket.send(&msg, 0) {
Ok(_) => {}
Err(e) => {
println!("{e:?}")
}
};
match reqsocket.recv_string(0) {
Ok(o) => match o {
Ok(_) => {}
Err(ee) => {
println!("{ee:?}")
}
},
Err(e) => {
println!("{e:?}")
}
};
}

6
scripts/upgrade.sh Executable file
View File

@ -0,0 +1,6 @@
#!/bin/sh
git pull
cargo b --release
sudo systemctl stop ipblc
sudo cp target/release/ipblc /usr/local/apps/ipblc/ipblc
sudo systemctl start ipblc

View File

@ -1,4 +1,4 @@
use crate::ip::{BlockIpData, IpData}; use crate::ip::{BlockIpData, IpData, IpEvent};
use crate::utils::{gethostname, sleep_s}; use crate::utils::{gethostname, sleep_s};
use chrono::prelude::*; use chrono::prelude::*;
@ -16,17 +16,16 @@ use std::path::Path;
pub const GIT_VERSION: &str = git_version!(); pub const GIT_VERSION: &str = git_version!();
const MASTERSERVER: &str = "ipbl.paulbsd.com"; const MASTERSERVER: &str = "ipbl.paulbsd.com";
const ZMQSUBSCRIPTION: &str = "ipbl"; const WSSUBSCRIPTION: &str = "ipbl";
const CONFIG_RETRY: u64 = 10; const CONFIG_RETRY: u64 = 1;
const WEB_CLIENT_TIMEOUT: i64 = 5;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Context { pub struct Context {
pub blocklist: HashMap<String, BlockIpData>, pub blocklist: HashMap<String, BlockIpData>,
pub cfg: Config, pub cfg: Config,
pub client: Client,
pub discovery: Discovery, pub discovery: Discovery,
pub flags: Flags, pub flags: Flags,
pub hostname: String,
pub instance: Box<Inotify>, pub instance: Box<Inotify>,
pub sas: HashMap<String, SetMap>, pub sas: HashMap<String, SetMap>,
pub hashwd: HashMap<String, WatchDescriptor>, pub hashwd: HashMap<String, WatchDescriptor>,
@ -49,10 +48,9 @@ pub struct Flags {
} }
impl Context { impl Context {
pub async fn new<'a>() -> Self { pub async fn new() -> Self {
// Get flags // Get flags
let argp: ArgMatches = Context::argparse(); let argp: ArgMatches = Context::argparse();
//let debug: bool = argp.contains_id("debug");
let debug: bool = argp.get_one::<bool>("debug").unwrap().to_owned(); let debug: bool = argp.get_one::<bool>("debug").unwrap().to_owned();
let server: String = argp.get_one::<String>("server").unwrap().to_string(); let server: String = argp.get_one::<String>("server").unwrap().to_string();
@ -60,43 +58,23 @@ impl Context {
let mut ctx = Context { let mut ctx = Context {
cfg: Config::new(), cfg: Config::new(),
flags: Flags { debug, server }, flags: Flags { debug, server },
hostname: gethostname(true),
discovery: Discovery { discovery: Discovery {
version: "1.0".to_string(), version: "1.0".to_string(),
urls: HashMap::new(), urls: HashMap::new(),
}, },
client: Client::builder()
.user_agent(format!(
"{}/{}@{}/{}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
GIT_VERSION,
gethostname(false)
))
.build()
.unwrap(),
sas: HashMap::new(), sas: HashMap::new(),
instance: Box::new(Inotify::init(InitFlags::empty()).unwrap()), instance: Box::new(Inotify::init(InitFlags::empty()).unwrap()),
blocklist: HashMap::new(), blocklist: HashMap::new(),
hashwd: HashMap::new(), hashwd: HashMap::new(),
}; };
loop { print!("Loading config ... ");
print!("Loading config ... "); ctx.load().await.unwrap();
match ctx.load().await {
Ok(_) => {
break;
}
Err(err) => {
println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs");
sleep_s(CONFIG_RETRY);
}
}
}
ctx ctx
} }
pub fn argparse<'a>() -> ArgMatches { pub fn argparse() -> ArgMatches {
Command::new(env!("CARGO_PKG_NAME")) Command::new(env!("CARGO_PKG_NAME"))
.version(format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION)) .version(format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION))
.author(env!("CARGO_PKG_AUTHORS")) .author(env!("CARGO_PKG_AUTHORS"))
@ -118,9 +96,9 @@ impl Context {
.get_matches() .get_matches()
} }
#[allow(dead_code)]
pub async fn discovery(&self) -> Result<Discovery, ReqError> { pub async fn discovery(&self) -> Result<Discovery, ReqError> {
let resp: Result<Response, ReqError> = self let resp: Result<Response, ReqError> = httpclient()
.client
.get(format!("{server}/discovery", server = self.flags.server)) .get(format!("{server}/discovery", server = self.flags.server))
.send() .send()
.await; .await;
@ -139,9 +117,30 @@ impl Context {
if cfg!(test) { if cfg!(test) {
return Ok(()); return Ok(());
} }
self.discovery = self.discovery().await?; let mut last_in_err = false;
self.cfg.load(self.to_owned()).await?; loop {
let res = self.cfg.load(&self.flags.server).await;
match res {
Ok(()) => {
if last_in_err {
println!("loaded config");
}
break;
}
Err(err) => {
println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs");
last_in_err = true;
sleep_s(CONFIG_RETRY).await;
}
};
}
if last_in_err {
println!("creating sas");
}
self.create_sas().await?; self.create_sas().await?;
if last_in_err {
println!("created sas");
}
Ok(()) Ok(())
} }
@ -156,43 +155,49 @@ impl Context {
pub async fn get_blocklist_toblock(&mut self) -> Vec<IpData> { pub async fn get_blocklist_toblock(&mut self) -> Vec<IpData> {
let mut res: Vec<IpData> = vec![]; let mut res: Vec<IpData> = vec![];
let now: DateTime<Local> = Local::now().trunc_subsecs(0);
for (_, block) in self.blocklist.iter_mut() { for (_, block) in self.blocklist.iter_mut() {
let set = self.cfg.sets.get(&block.ipdata.src.to_string()).unwrap(); match self.cfg.sets.get(&block.ipdata.src) {
if block.tryfail >= set.tryfail { Some(set) => {
res.push(block.ipdata.clone()); if block.tryfail >= set.tryfail {
if block.tryfail == set.tryfail { res.push(block.ipdata.clone());
block.starttime = DateTime::from(now); }
} }
None => {}
} }
} }
res res
} }
pub async fn update_blocklist(&mut self, ipdata: &mut IpData) -> Option<IpData> { pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> {
match self.cfg.sets.get(&ipdata.src) { match self.cfg.sets.get(&ipevent.ipdata.src) {
Some(set) => { Some(set) => {
if self.blocklist.contains_key(&ipdata.ip) let starttime = DateTime::parse_from_rfc3339(ipevent.ipdata.date.as_str())
&& self.hostname == ipdata.hostname .unwrap()
&& ipdata.mode == "file".to_string() .with_timezone(&chrono::Local);
{ let blocktime = set.blocktime;
let mut block = self.blocklist.get_mut(&ipdata.ip).unwrap(); if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
let block = self
.blocklist
.entry(ipevent.ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipevent.ipdata.clone(),
tryfail: 0,
starttime,
blocktime,
});
block.tryfail += 1; block.tryfail += 1;
block.blocktime = set.blocktime; block.blocktime = blocktime;
if block.tryfail >= set.tryfail { if block.tryfail >= set.tryfail {
return Some(block.ipdata.clone()); return Some(ipevent.clone());
} }
} else { } else {
let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str())
.unwrap()
.with_timezone(&chrono::Local);
self.blocklist self.blocklist
.entry(ipdata.ip.to_string()) .entry(ipevent.ipdata.ip.to_string())
.or_insert(BlockIpData { .or_insert(BlockIpData {
ipdata: ipdata.clone(), ipdata: ipevent.ipdata.clone(),
tryfail: 100, tryfail: set.tryfail,
starttime, starttime,
blocktime: set.blocktime, blocktime,
}); });
} }
} }
@ -205,8 +210,15 @@ impl Context {
let mut removed: Vec<IpData> = vec![]; let mut removed: Vec<IpData> = vec![];
let now: DateTime<Local> = Local::now().trunc_subsecs(0); let now: DateTime<Local> = Local::now().trunc_subsecs(0);
// nightly, future use // nightly, future use
//let drained: HashMap<String,IpData> = ctx.blocklist.drain_filter(|k,v| v.parse_date() < mindate) // let drained: HashMap<String,IpData> = ctx.blocklist.drain_filter(|k,v| v.parse_date() < mindate)
for (ip, blocked) in self.blocklist.clone().iter() { for (ip, blocked) in self.blocklist.clone().iter() {
/*match self.cfg.sets.get(&blocked.ipdata.src) {
Some(set) => {
let mut block = self.blocklist.get_mut(ip).unwrap();
block.blocktime = set.blocktime.clone();
}
None => {}
}*/
let mindate = now - Duration::minutes(blocked.blocktime); let mindate = now - Duration::minutes(blocked.blocktime);
if blocked.starttime < mindate { if blocked.starttime < mindate {
self.blocklist.remove(&ip.clone()).unwrap(); self.blocklist.remove(&ip.clone()).unwrap();
@ -273,7 +285,8 @@ pub struct Config {
pub sets: HashMap<String, Set>, pub sets: HashMap<String, Set>,
#[serde(skip_serializing)] #[serde(skip_serializing)]
pub trustnets: Vec<String>, pub trustnets: Vec<String>,
pub zmq: HashMap<String, ZMQ>, pub ws: HashMap<String, WebSocketCfg>,
pub api: String,
} }
impl Config { impl Config {
@ -323,34 +336,59 @@ impl Config {
"172.16.0.0/12".to_string(), "172.16.0.0/12".to_string(),
"192.168.0.0/16".to_string(), "192.168.0.0/16".to_string(),
], ],
zmq: HashMap::from([("pubsub".to_string(),ZMQ{ ws: HashMap::from([("pubsub".to_string(),WebSocketCfg{
t: "pubsub".to_string(), t: "pubsub".to_string(),
hostname: MASTERSERVER.to_string(), endpoint: format!("wss://{}/wsps", MASTERSERVER.to_string()),
port: 9999, subscription: WSSUBSCRIPTION.to_string(),
subscription: ZMQSUBSCRIPTION.to_string(), }),("reqrep".to_string(), WebSocketCfg {
}),("reqrep".to_string(),ZMQ {
t: "reqrep".to_string(), t: "reqrep".to_string(),
hostname: MASTERSERVER.to_string(), endpoint: format!("wss://{}/wsrr", MASTERSERVER.to_string()),
port: 9998, subscription: WSSUBSCRIPTION.to_string(),
subscription: String::new(), })]),
})]) api: String::from("127.0.0.1:8060")
} }
} }
pub async fn load(&mut self, ctx: Context) -> Result<(), ReqError> { pub async fn load(&mut self, server: &String) -> Result<(), ReqError> {
self.get_trustnets(&ctx).await?; self.get_global_config(server).await?;
self.get_sets(&ctx).await?; self.get_trustnets(server).await?;
self.get_zmq_config(&ctx).await?; self.get_sets(server).await?;
self.get_ws_config(server).await?;
Ok(()) Ok(())
} }
async fn get_trustnets(&mut self, ctx: &Context) -> Result<(), ReqError> { async fn get_global_config(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = ctx let resp: Result<Response, ReqError> =
.client httpclient().get(format!("{server}/config")).send().await;
.get(format!( let req = match resp {
"{server}/config/trustlist", Ok(re) => re,
server = ctx.flags.server Err(err) => return Err(err),
)) };
let data: HashMap<String, GlobalConfig> = match req.json::<Vec<GlobalConfig>>().await {
Ok(res) => {
let mut out: HashMap<String, GlobalConfig> = HashMap::new();
res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.key.to_string(), x);
});
out
}
Err(err) => return Err(err),
};
let key = "".to_string();
self.api = data
.get(&key.to_string())
.unwrap_or(&GlobalConfig {
key: "api".to_string(),
value: "127.0.0.1:8060".to_string(),
})
.value
.clone();
Ok(())
}
async fn get_trustnets(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = httpclient()
.get(format!("{server}/config/trustlist"))
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
@ -365,10 +403,9 @@ impl Config {
Ok(()) Ok(())
} }
async fn get_sets(&mut self, ctx: &Context) -> Result<(), ReqError> { async fn get_sets(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = ctx let resp: Result<Response, ReqError> = httpclient()
.client .get(format!("{server}/config/sets"))
.get(format!("{server}/config/sets", server = ctx.flags.server))
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
@ -385,19 +422,16 @@ impl Config {
Ok(()) Ok(())
} }
async fn get_zmq_config(&mut self, ctx: &Context) -> Result<(), ReqError> { async fn get_ws_config(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = ctx let resp: Result<Response, ReqError> =
.client httpclient().get(format!("{server}/config/ws")).send().await;
.get(format!("{server}/config/zmq", server = ctx.flags.server))
.send()
.await;
let req = match resp { let req = match resp {
Ok(re) => re, Ok(re) => re,
Err(err) => return Err(err), Err(err) => return Err(err),
}; };
let data: HashMap<String, ZMQ> = match req.json::<Vec<ZMQ>>().await { let data: HashMap<String, WebSocketCfg> = match req.json::<Vec<WebSocketCfg>>().await {
Ok(res) => { Ok(res) => {
let mut out: HashMap<String, ZMQ> = HashMap::new(); let mut out: HashMap<String, WebSocketCfg> = HashMap::new();
res.into_iter().map(|x| x).for_each(|x| { res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.t.to_string(), x); out.insert(x.t.to_string(), x);
}); });
@ -405,7 +439,7 @@ impl Config {
} }
Err(err) => return Err(err), Err(err) => return Err(err),
}; };
self.zmq = data; self.ws = data;
Ok(()) Ok(())
} }
@ -421,6 +455,41 @@ impl Config {
} }
trustnets trustnets
} }
pub fn bootstrap_event(&self) -> IpEvent {
IpEvent {
msgtype: String::from("bootstrap"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: IpData {
ip: "".to_string(),
src: "".to_string(),
date: "".to_string(),
hostname: "".to_string(),
},
}
}
}
pub fn httpclient() -> Client {
let client = Client::builder()
.user_agent(format!(
"{}/{}@{}/{}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
GIT_VERSION,
gethostname(false)
))
.timeout(Duration::seconds(WEB_CLIENT_TIMEOUT).to_std().unwrap())
.build()
.unwrap();
client
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct GlobalConfig {
pub key: String,
pub value: String,
} }
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
@ -434,11 +503,10 @@ pub struct Set {
} }
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ZMQ { pub struct WebSocketCfg {
#[serde(rename = "type")] #[serde(rename = "type")]
pub t: String, pub t: String,
pub hostname: String, pub endpoint: String,
pub port: i64,
pub subscription: String, pub subscription: String,
} }
@ -478,51 +546,71 @@ mod test {
ctx.blocklist = HashMap::new(); ctx.blocklist = HashMap::new();
for _i in 0..10 { for _i in 0..10 {
ctx.update_blocklist(&mut IpData { ctx.update_blocklist(&mut IpEvent {
ip: "1.1.1.1".to_string(), msgtype: String::from("add"),
hostname: "test1".to_string(), mode: String::from("ws"),
date: now.to_rfc3339().to_string(), hostname: String::from("localhost"),
src: "ssh".to_string(), ipdata: IpData {
mode: "file".to_string(), ip: "1.1.1.1".to_string(),
hostname: "test1".to_string(),
date: now.to_rfc3339().to_string(),
src: "ssh".to_string(),
},
}) })
.await; .await;
} }
for _ in 0..10 { for _ in 0..10 {
ctx.update_blocklist(&mut IpData { ctx.update_blocklist(&mut IpEvent {
ip: "1.1.1.2".to_string(), msgtype: String::from("add"),
hostname: "test2".to_string(), mode: String::from("ws"),
date: now.to_rfc3339().to_string(), hostname: String::from("localhost"),
src: "http".to_string(), ipdata: IpData {
mode: "file".to_string(), ip: "1.1.1.2".to_string(),
hostname: "test2".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
},
}) })
.await; .await;
} }
ctx.update_blocklist(&mut IpData { ctx.update_blocklist(&mut IpEvent {
ip: "1.1.1.3".to_string(), msgtype: String::from("add"),
hostname: "testgood".to_string(), mode: String::from("ws"),
date: now.to_rfc3339().to_string(), hostname: String::from("localhost"),
src: "http".to_string(), ipdata: IpData {
mode: "file".to_string(), ip: "1.1.1.3".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
},
}) })
.await; .await;
ctx.update_blocklist(&mut IpData { ctx.update_blocklist(&mut IpEvent {
ip: "1.1.1.4".to_string(), msgtype: String::from("add"),
hostname: "testgood".to_string(), mode: String::from("ws"),
date: now.to_rfc3339().to_string(), hostname: String::from("localhost"),
src: "http".to_string(), ipdata: IpData {
mode: "file".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
},
}) })
.await; .await;
ctx.update_blocklist(&mut IpData { ctx.update_blocklist(&mut IpEvent {
ip: "1.1.1.4".to_string(), msgtype: String::from("add"),
hostname: "testgood".to_string(), mode: String::from("ws"),
date: now.to_rfc3339().to_string(), hostname: String::from("localhost"),
src: "http".to_string(), ipdata: IpData {
mode: "file".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
},
}) })
.await; .await;

View File

@ -1,15 +1,13 @@
use crate::ip::IpData; use crate::ip::IpData;
use crate::ipblc::PKG_NAME;
use nftnl::{nft_expr, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table}; use nftnl::{nft_expr, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table};
use std::{ffi::CString, io::Error, net::Ipv4Addr}; use std::{ffi::CString, io::Error, net::Ipv4Addr};
pub fn init(tablename: &String) -> (Batch, Table) { pub fn fwinit() -> (Batch, Table) {
let mut batch = Batch::new(); let mut batch = Batch::new();
let table = Table::new( let table = Table::new(&CString::new(PKG_NAME).unwrap(), ProtoFamily::Ipv4);
&CString::new(tablename.as_str()).unwrap(),
ProtoFamily::Ipv4,
);
batch.add(&table, nftnl::MsgType::Add); batch.add(&table, nftnl::MsgType::Add);
batch.add(&table, nftnl::MsgType::Del); batch.add(&table, nftnl::MsgType::Del);
@ -18,18 +16,17 @@ pub fn init(tablename: &String) -> (Batch, Table) {
(batch, table) (batch, table)
} }
pub fn block( pub fn fwblock(
tablename: &String,
ips_add: &Vec<IpData>, ips_add: &Vec<IpData>,
ret: &mut Vec<String>, ret: &mut Vec<String>,
fwlen: &mut usize, fwlen: &mut usize,
) -> std::result::Result<(), Error> { ) -> std::result::Result<(), Error> {
// convert chain // convert chain
let ips_add = convert(ips_add); let ips_add = convert(ips_add);
let (mut batch, table) = init(tablename); let (mut batch, table) = fwinit();
// build chain // build chain
let mut chain = Chain::new(&CString::new(tablename.as_str()).unwrap(), &table); let mut chain = Chain::new(&CString::new(PKG_NAME).unwrap(), &table);
chain.set_hook(nftnl::Hook::In, 1); chain.set_hook(nftnl::Hook::In, 1);
chain.set_policy(nftnl::Policy::Accept); chain.set_policy(nftnl::Policy::Accept);

107
src/ip.rs
View File

@ -1,4 +1,4 @@
use crate::config::Context; use crate::config::httpclient;
use crate::utils::gethostname; use crate::utils::gethostname;
use chrono::prelude::*; use chrono::prelude::*;
@ -18,13 +18,20 @@ lazy_static! {
static ref R_DATE: Regex = Regex::new(include_str!("regexps/date.txt")).unwrap(); static ref R_DATE: Regex = Regex::new(include_str!("regexps/date.txt")).unwrap();
} }
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IpEvent {
pub msgtype: String,
pub mode: String,
pub hostname: String,
pub ipdata: IpData,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq)] #[derive(Clone, Debug, Serialize, Deserialize, Eq)]
pub struct IpData { pub struct IpData {
pub ip: String, pub ip: String,
pub src: String, pub src: String,
pub date: String, pub date: String,
pub hostname: String, pub hostname: String,
pub mode: String,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -60,89 +67,18 @@ impl Display for IpData {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!( write!(
f, f,
"ip: {ip}, src: {src}, date: {date}, hostname: {hostname}, mode: {mode}", "ip: {ip}, src: {src}, date: {date}, hostname: {hostname}",
ip = self.ip, ip = self.ip,
src = self.src, src = self.src,
date = self.date, date = self.date,
hostname = self.hostname, hostname = self.hostname,
mode = self.mode,
) )
} }
} }
pub async fn push_ip(ctx: &Context, ip: &IpData, ret: &mut Vec<String>) -> Result<(), ReqError> {
let result: String;
let mut data: Vec<IpData> = vec![];
data.push(IpData {
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
mode: "file".to_string(),
});
let resp = ctx
.client
.post(format!("{server}/ips", server = ctx.flags.server))
.json(&data)
.send()
.await?;
ret.push(format!("status: {status}", status = resp.status()));
let res = resp.text().await.unwrap();
if res.trim().len() > 0 {
result = res.trim().to_string();
} else {
result = "".to_string();
}
ret.push(format!("response: {result}"));
Ok(())
}
pub async fn _push_ip_bulk(
ctx: &Context,
ips: &Vec<IpData>,
ret: &mut Vec<String>,
) -> Result<(), ReqError> {
let result: String;
let mut data: Vec<IpData> = vec![];
for ip in ips {
data.push(IpData {
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
mode: "file".to_string(),
})
}
let resp = ctx
.client
.post(format!("{server}/ips", server = ctx.flags.server))
.json(&data)
.send()
.await?;
ret.push(format!("status: {status}", status = resp.status()));
let res = resp.text().await.unwrap();
if res.trim().len() > 0 {
result = res.trim().to_string();
} else {
result = "".to_string();
}
ret.push(format!("response: {result}"));
Ok(())
}
pub fn filter( pub fn filter(
lines: Box<dyn Read>, lines: Box<dyn Read>,
list: &mut Vec<IpData>, iplist: &mut Vec<IpData>,
trustnets: &Vec<IpNet>, trustnets: &Vec<IpNet>,
regex: &Regex, regex: &Regex,
src: &String, src: &String,
@ -193,12 +129,11 @@ pub fn filter(
}; };
if !is_trusted(&ipaddr, &trustnets) { if !is_trusted(&ipaddr, &trustnets) {
list.push(IpData { iplist.push(IpData {
ip: s_ipaddr, ip: s_ipaddr,
src: src.to_owned(), src: src.to_owned(),
date: s_date.to_rfc3339().to_owned(), date: s_date.to_rfc3339().to_owned(),
hostname: hostname.to_owned(), hostname: hostname.to_owned(),
mode: "file".to_owned(),
}); });
ips += 1; ips += 1;
}; };
@ -222,8 +157,15 @@ fn parse_date(input: regex::Captures) -> DateTime<Local> {
} }
let date = Local let date = Local
.ymd(ymd[0] as i32, ymd[1] as u32, ymd[2] as u32) .with_ymd_and_hms(
.and_hms(hms[0] as u32, hms[1] as u32, hms[2] as u32); ymd[0] as i32,
ymd[1] as u32,
ymd[2] as u32,
hms[0] as u32,
hms[1] as u32,
hms[2] as u32,
)
.unwrap();
date date
} }
@ -236,10 +178,9 @@ fn is_trusted(ip: &IpAddr, trustnets: &Vec<IpNet>) -> bool {
false false
} }
pub async fn _get_last(ctx: &Context) -> Result<Vec<IpData>, ReqError> { pub async fn _get_last(server: &String) -> Result<Vec<IpData>, ReqError> {
let resp = ctx let resp = httpclient()
.client .get(format!("{server}/ips/last"))
.get(format!("{server}/ips/last", server = ctx.flags.server))
.query(&[("interval", "3 hours")]) .query(&[("interval", "3 hours")])
.send() .send()
.await; .await;

View File

@ -1,8 +1,10 @@
use crate::config::{Context, GIT_VERSION}; use crate::config::{Context, GIT_VERSION};
use crate::fw::{block, init}; use crate::fw::{fwblock, fwinit};
use crate::ip::{filter, push_ip, IpData}; use crate::ip::{filter, IpData, IpEvent};
use crate::monitoring::apiserver;
use crate::utils::{gethostname, read_lines, sleep_s}; use crate::utils::{gethostname, read_lines, sleep_s};
use crate::zmqcom::zconnect; use crate::webservice::send_to_ipbl_api;
use crate::websocket::{send_to_ipbl_websocket, websocketpubsub, websocketreqrep};
use chrono::prelude::*; use chrono::prelude::*;
use chrono::prelude::{DateTime, Local}; use chrono::prelude::{DateTime, Local};
@ -11,126 +13,167 @@ use nix::sys::inotify::InotifyEvent;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex; use tokio::sync::RwLock;
pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
const BL_CHAN_SIZE: usize = 32; const BL_CHAN_SIZE: usize = 32;
const ZMQ_CHAN_SIZE: usize = 64; const WS_CHAN_SIZE: usize = 64;
const LOOP_MAX_WAIT: u64 = 5;
pub async fn run() { pub async fn run() {
let ctx = Arc::new(Mutex::new(Context::new().await)); let globalctx = Context::new().await;
println!( let ctxarc = Arc::new(RwLock::new(globalctx));
"Launching {}, version {}",
env!("CARGO_PKG_NAME"),
format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION)
);
let (ipdatatx, mut ipdatarx): (Sender<IpData>, Receiver<IpData>) = channel(ZMQ_CHAN_SIZE);
// initialize the firewall table
init(&env!("CARGO_PKG_NAME").to_string());
let mut fwlen: usize = 0; let mut fwlen: usize = 0;
// initialize zeromq sockets let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
let reqsocket;
let subsocket;
{
let ctxarc = Arc::clone(&ctx);
let zmqctx = ctxarc.lock().await;
reqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ)
.await
.unwrap();
subsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB)
.await
.unwrap();
}
listenpubsub(&ctx, ipdatatx.clone(), subsocket).await; let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
println!("Launching {}, version {}", PKG_NAME, pkgversion);
fwinit();
let mut blrx = watchfiles(&ctx).await; let ctxapi = Arc::clone(&ctxarc);
apiserver(&ctxapi).await.unwrap();
let ctxarc = Arc::clone(&ctx); // initialize sockets
let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(WS_CHAN_SIZE);
let ipeventtxarc = Arc::new(RwLock::new(ipeventtx));
// init pubsub
let ctxwsps = Arc::clone(&ctxarc);
let ipeventws = Arc::clone(&ipeventtxarc);
websocketpubsub(&ctxwsps, ipeventws).await;
let ctxwsrr = Arc::clone(&ctxarc);
let mut wssocketrr = websocketreqrep(&ctxwsrr).await;
// init file watcher
let mut blrx = watchfiles(&ctxarc).await;
let ctxclone = Arc::clone(&ctxarc);
let ipeventclone = Arc::clone(&ipeventtxarc);
tokio::spawn(async move { tokio::spawn(async move {
compare_files_changes(&ctxarc, &mut blrx, &ipdatatx).await; compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await;
}); });
let mut ip_init = IpData {
ip: "".to_string(),
src: "".to_string(),
date: "".to_string(),
hostname: "".to_string(),
mode: "init".to_string(),
};
send_to_ipbl_zmq(&reqsocket, &mut ip_init).await;
loop { loop {
let mut ret: Vec<String> = Vec::new(); let mut ret: Vec<String> = Vec::new();
let begin: DateTime<Local> = Local::now().trunc_subsecs(0);
// wait for logs parse and zmq channel receive let ctxclone = Arc::clone(&ctxarc);
let mut received_ip = ipdatarx.recv().await.unwrap();
// lock the context mutex tokio::select! {
let ctxarc = Arc::clone(&ctx); ipevent = ipeventrx.recv() => {
let mut ctx = ctxarc.lock().await; let received_ip = ipevent.unwrap();
if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() { let (toblock,server);
for ip_to_send in &mut ctx.get_blocklist_toblock().await { {
ip_to_send.mode = "init".to_string(); let mut ctx = ctxclone.write().await;
send_to_ipbl_zmq(&reqsocket, ip_to_send).await; toblock = ctx.get_blocklist_toblock().await;
server = ctx.flags.server.clone();
}
if received_ip.msgtype == "bootstrap".to_string() {
for ip_to_send in toblock {
let ipe = IpEvent{
msgtype: String::from("init"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: ip_to_send,
};
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr = websocketreqrep(&ctxwsrr).await;
break;
}
}
continue
}
// refresh context blocklist
let filtered_ipevent;
{
let mut ctx = ctxarc.write().await;
filtered_ipevent = ctx.update_blocklist(&received_ip).await;
}
// send ip list to api and ws sockets
if let Some(ipevent) = filtered_ipevent {
if received_ip.msgtype != "init" {
println!("sending {} to api and ws", ipevent.ipdata.ip);
let ipe = IpEvent{
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: ipevent.ipdata,
};
send_to_ipbl_api(&server.clone(), &ipe).await;
let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
if !status {
wssocketrr = websocketreqrep(&ctxwsrr).await;
continue;
}
}
}
} }
continue; _val = sleep_s(LOOP_MAX_WAIT) => {}
} };
// refresh context blocklist let ctxclone = Arc::clone(&ctxarc);
let filtered_ip = ctx.update_blocklist(&mut received_ip).await; handle_fwblock(ctxclone, &mut ret, &mut fwlen).await;
ctx.gc_blocklist().await;
// send ip list to ws and zmq sockets
if let Some(mut ip) = filtered_ip {
send_to_ipbl_ws(&ctx, &mut ip, &mut ret).await;
send_to_ipbl_zmq(&reqsocket, &mut ip).await;
}
// apply firewall blocking
block(
&env!("CARGO_PKG_NAME").to_string(),
&ctx.get_blocklist_toblock().await,
&mut ret,
&mut fwlen,
)
.unwrap();
// log lines // log lines
if ret.len() > 0 { if ret.len() > 0 {
println!("{ret}", ret = ret.join(", ")); println!("{ret}", ret = ret.join(", "));
} }
let end: DateTime<Local> = Local::now().trunc_subsecs(0); let ctxclone = Arc::clone(&ctxarc);
if (end - begin) > Duration::seconds(5) { handle_cfg_reload(&ctxclone, &mut last_cfg_reload).await;
// reload configuration from the server
match ctx.load().await {
Ok(_) => {}
Err(err) => {
println!("error loading config: {err}");
}
}
}
} }
} }
async fn watchfiles(ctx: &Arc<Mutex<Context>>) -> Receiver<FileEvent> { async fn handle_cfg_reload(ctxclone: &Arc<RwLock<Context>>, last_cfg_reload: &mut DateTime<Local>) {
let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) {
let mut ctx = ctxclone.write().await;
match ctx.load().await {
Ok(_) => {
*last_cfg_reload = Local::now().trunc_subsecs(0);
}
Err(_) => {
println!("error reloading config");
}
}
};
}
async fn handle_fwblock(ctxclone: Arc<RwLock<Context>>, ret: &mut Vec<String>, fwlen: &mut usize) {
let toblock = {
let mut ctx = ctxclone.write().await;
ctx.gc_blocklist().await;
ctx.get_blocklist_toblock().await
};
// apply firewall blocking
match fwblock(&toblock, ret, fwlen) {
Ok(_) => {}
Err(err) => {
println!("Err: {err}, unable to push firewall rules, use super user")
}
};
}
async fn watchfiles(ctxarc: &Arc<RwLock<Context>>) -> Receiver<FileEvent> {
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE); let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);
let ctx = Arc::clone(ctx); let ctxclone = Arc::clone(ctxarc);
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
let events: Vec<InotifyEvent>; let events;
let instance;
{ {
let c = ctx.lock().await; let ctx = ctxclone.read().await;
let instance = c.instance.clone(); instance = ctx.instance.clone();
drop(c);
events = instance.read_events().unwrap();
} }
events = instance.read_events().unwrap();
for inevent in events { for inevent in events {
let date: DateTime<Local> = Local::now().trunc_subsecs(0); let date: DateTime<Local> = Local::now().trunc_subsecs(0);
bltx.send(FileEvent { inevent, date }).await.unwrap(); bltx.send(FileEvent { inevent, date }).await.unwrap();
@ -153,23 +196,29 @@ async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, b
} }
async fn compare_files_changes( async fn compare_files_changes(
ctx: &Arc<Mutex<Context>>, ctxarc: &Arc<RwLock<Context>>,
inrx: &mut Receiver<FileEvent>, inrx: &mut Receiver<FileEvent>,
ipdatatx: &Sender<IpData>, ipeventtx: &Arc<RwLock<Sender<IpEvent>>>,
) { ) {
let mut tnets; let mut tnets;
loop { loop {
let modfiles = inrx.recv().await.unwrap(); let modfiles = inrx.recv().await.unwrap();
let mut iplist: Vec<IpData> = vec![]; let mut iplist: Vec<IpData> = vec![];
let mut ctx = ctx.lock().await; let sask;
tnets = ctx.cfg.build_trustnets(); let sas;
{
let ctx = ctxarc.read().await;
sas = ctx.clone().sas;
sask = sas.keys();
tnets = ctx.cfg.build_trustnets();
}
match modfiles.inevent.name { match modfiles.inevent.name {
Some(name) => { Some(name) => {
let filename = name.to_str().unwrap(); let filename = name.to_str().unwrap();
for sak in &mut ctx.clone().sas.keys() { for sak in sask {
let sa = &mut ctx.sas.get_mut(sak).unwrap(); let sa = sas.get(sak).unwrap();
if modfiles.inevent.wd == sa.wd { if modfiles.inevent.wd == sa.wd {
let handle: String; let handle: String;
if sa.filename.as_str() == "" { if sa.filename.as_str() == "" {
@ -180,8 +229,13 @@ async fn compare_files_changes(
continue; continue;
} }
let (filesize, sizechanged) = let (filesize, sizechanged);
get_last_file_size(&mut sa.watchedfiles, &handle).await; {
let mut ctx = ctxarc.write().await;
let sa = ctx.sas.get_mut(sak).unwrap();
(filesize, sizechanged) =
get_last_file_size(&mut sa.watchedfiles, &handle).await;
}
if !sizechanged { if !sizechanged {
continue; continue;
@ -204,7 +258,14 @@ async fn compare_files_changes(
} }
} }
for ip in iplist { for ip in iplist {
ipdatatx.send(ip).await.unwrap(); let ipevent = IpEvent {
msgtype: String::from("add"),
hostname: gethostname(true),
mode: String::from("file"),
ipdata: ip,
};
let ipetx = ipeventtx.write().await;
ipetx.send(ipevent).await.unwrap();
} }
} }
None => {} None => {}
@ -222,79 +283,3 @@ impl std::fmt::Debug for FileEvent {
write!(f, "{ie:?}", ie = self.inevent) write!(f, "{ie:?}", ie = self.inevent)
} }
} }
async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &mut IpData) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match reqsocket.send(&msg, 0) {
Ok(_) => {}
Err(e) => {
println!("{e:?}")
}
};
match reqsocket.recv_string(0) {
Ok(o) => match o {
Ok(_) => {}
Err(ee) => {
println!("{ee:?}")
}
},
Err(e) => {
println!("{e:?}")
}
};
}
async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec<String>) {
ret.push(format!("host: {hostname}", hostname = ctx.hostname));
loop {
match push_ip(&ctx, &ip, ret).await {
Ok(_) => {
break;
}
Err(err) => {
println!("{err}");
sleep_s(1);
}
};
}
}
async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
let ctx = ctx.lock().await;
let prefix = format!(
"{sub} ",
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
);
socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription");
drop(ctx);
tokio::spawn(async move {
loop {
let msgs: Option<String> = match socket.recv_string(0) {
Ok(s) => match s {
Ok(ss) => Some(ss),
Err(e) => {
println!("{e:?}");
None
}
},
Err(e) => {
println!("{e:?}");
None
}
};
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
let tosend: IpData = serde_json::from_str(msg).unwrap();
if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() {
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
});
}

View File

@ -2,11 +2,12 @@ mod config;
mod fw; mod fw;
mod ip; mod ip;
mod ipblc; mod ipblc;
mod monitoring;
mod utils; mod utils;
mod zmqcom; mod webservice;
mod websocket;
#[tokio::main] #[tokio::main]
pub async fn main() { pub async fn main() {
// Create a new context
ipblc::run().await; ipblc::run().await;
} }

57
src/monitoring.rs Normal file
View File

@ -0,0 +1,57 @@
use crate::config::Context;
use serde_json;
use std::io;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpSocket;
use tokio::sync::RwLock;
pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let ctxarc = ctxarc.clone();
let addr;
{
let ctx = ctxarc.read().await;
addr = ctx.cfg.api.parse().unwrap();
}
let socket = TcpSocket::new_v4().unwrap();
socket.bind(addr).unwrap();
socket.set_reuseaddr(true).unwrap();
let listener = socket.listen(1024).unwrap();
tokio::spawn(async move {
loop {
//apitx.send(String::from("")).await.unwrap();
match listener.accept().await {
Ok((stream, _addr)) => {
//let mut buf = [0; 1024];
let data;
{
let ctx = ctxarc.read().await;
data = serde_json::to_string(&ctx.blocklist);
}
match data {
Ok(dt) => {
let (_reader, mut writer) = stream.into_split();
match writer.write_all(format!("{dt}").as_bytes()).await {
Ok(_) => {}
Err(err) => {
println!("{err}");
}
}
}
Err(err) => {
println!("unable to serialize data: {err}");
}
}
}
Err(err) => {
println!("couldn't get client: {}", err)
}
}
}
});
Ok(())
}

View File

@ -5,7 +5,7 @@ use std::boxed::Box;
use std::fs::File; use std::fs::File;
use std::io::*; use std::io::*;
use std::path::Path; use std::path::Path;
use std::time::Duration; use tokio::time::{sleep, Duration};
lazy_static! { lazy_static! {
static ref R_FILE_GZIP: Regex = Regex::new(r".*\.gz.*").unwrap(); static ref R_FILE_GZIP: Regex = Regex::new(r".*\.gz.*").unwrap();
@ -33,12 +33,12 @@ pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
list.len() list.len()
} }
pub fn _sleep_ms(ms: u64) { pub async fn _sleep_ms(ms: u64) {
std::thread::sleep(Duration::from_millis(ms)); sleep(Duration::from_millis(ms)).await;
} }
pub fn sleep_s(s: u64) { pub async fn sleep_s(s: u64) {
std::thread::sleep(Duration::from_secs(s)); sleep(Duration::from_secs(s)).await;
} }
pub fn gethostname(show_fqdn: bool) -> String { pub fn gethostname(show_fqdn: bool) -> String {

83
src/webservice.rs Normal file
View File

@ -0,0 +1,83 @@
use crate::config::{httpclient, Context};
use crate::ip::{IpData, IpEvent};
use crate::utils::sleep_s;
use reqwest::Client;
use reqwest::Error as ReqError;
const MAX_FAILED_API_RATE: u64 = 10;
pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
let mut try_req = 0;
let client = httpclient();
loop {
match push_ip(&client, &server, &ip.ipdata).await {
Ok(_) => {
break;
}
Err(err) => {
println!("{err}");
sleep_s(1).await;
if try_req == MAX_FAILED_API_RATE {
break;
}
try_req += 1;
}
};
}
}
async fn push_ip(client: &Client, server: &str, ip: &IpData) -> Result<(), ReqError> {
let mut data: Vec<IpData> = vec![];
data.push(IpData {
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
});
client
.post(format!("{server}/ips"))
.json(&data)
.send()
.await?;
Ok(())
}
async fn _push_ip_bulk(
ctx: &Context,
ips: &Vec<IpData>,
ret: &mut Vec<String>,
) -> Result<(), ReqError> {
let result: String;
let mut data: Vec<IpData> = vec![];
for ip in ips {
data.push(IpData {
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
})
}
let resp = httpclient()
.post(format!("{server}/ips", server = ctx.flags.server))
.json(&data)
.send()
.await?;
ret.push(format!("status: {status}", status = resp.status()));
let res = resp.text().await.unwrap();
if res.trim().len() > 0 {
result = res.trim().to_string();
} else {
result = "".to_string();
}
ret.push(format!("response: {result}"));
Ok(())
}

134
src/websocket.rs Normal file
View File

@ -0,0 +1,134 @@
use crate::config::{Context, WebSocketCfg};
use crate::ip::IpEvent;
use crate::utils::{gethostname, sleep_s};
use serde_json::json;
use std::io::{self, Write};
use std::net::TcpStream;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tungstenite::stream::*;
use tungstenite::*;
pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event, cfg);
{
let ctx = ctxarc.read().await;
bootstrap_event = ctx.cfg.bootstrap_event().clone();
cfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
}
wssocketrr = websocketconnect(&cfg, &gethostname(true)).await.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
return wssocketrr;
}
pub async fn websocketpubsub(
ctxarc: &Arc<RwLock<Context>>,
txpubsub: Arc<RwLock<Sender<IpEvent>>>,
) {
let cfg;
{
let ctx = ctxarc.read().await;
cfg = ctx.cfg.ws.get("pubsub").unwrap().clone();
}
let mut websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, &gethostname(true)).await.unwrap(),
));
tokio::spawn(async move {
loop {
let mut ws = websocket.write().await;
match ws.read_message() {
Ok(msg) => {
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
Ok(o) => o,
Err(_e) => {
continue;
}
};
if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
let txps = txpubsub.write().await;
txps.send(tosend).await.unwrap();
}
}
Err(e) => {
println!("error in pubsub: {e:?}");
ws.close(None).unwrap();
drop(ws);
websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, &gethostname(true)).await.unwrap(),
));
}
};
}
});
}
pub async fn websocketconnect<'a>(
wscfg: &WebSocketCfg,
hostname: &String,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> {
let endpoint = &wscfg.endpoint;
print!("connecting to {} ... ", endpoint);
io::stdout().flush().unwrap();
let mut socket;
loop {
(socket, _) = match connect(endpoint) {
Ok((o, e)) => (o, e),
_ => {
println!("error connecting to {endpoint}, retrying");
sleep_s(1).await;
continue;
}
};
break;
}
println!("connected to {endpoint}");
let msg = json!({ "hostname": hostname });
socket
.write_message(Message::Text(msg.to_string()))
.unwrap();
Ok(socket)
}
pub async fn send_to_ipbl_websocket(
ws: &mut WebSocket<MaybeTlsStream<TcpStream>>,
ip: &IpEvent,
) -> bool {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
if ws.can_write() {
match ws.write_message(Message::Text(msg)) {
Ok(_) => {}
Err(e) => {
println!("err send read: {e:?}");
return handle_websocket_error(ws);
}
};
} else {
return handle_websocket_error(ws);
};
if ws.can_read() {
match ws.read_message() {
Ok(_) => {}
Err(e) => {
println!("err send read: {e:?}");
return handle_websocket_error(ws);
}
};
} else {
return handle_websocket_error(ws);
};
true
}
fn handle_websocket_error(ws: &mut WebSocket<MaybeTlsStream<TcpStream>>) -> bool {
ws.close(None).unwrap();
return false;
}

View File

@ -1,13 +0,0 @@
use crate::config::ZMQ;
const ZMQPROTO: &str = "tcp";
pub async fn zconnect(zmqcfg: &ZMQ, zmqtype: zmq::SocketType) -> Result<zmq::Socket, zmq::Error> {
let zctx = zmq::Context::new();
let zmqhost = &zmqcfg.hostname;
let zmqport = &zmqcfg.port;
let socket = zctx.socket(zmqtype).unwrap();
let connectstring = format!("{ZMQPROTO}://{zmqhost}:{zmqport}");
socket.connect(&connectstring.as_str())?;
Ok(socket)
}