Compare commits

..

No commits in common. "master" and "1.2.1" have entirely different histories.

15 changed files with 956 additions and 1143 deletions

View File

@ -8,20 +8,13 @@ platform:
arch: amd64
steps:
- name: build and test
- name: test and build
image: rust:1
pull: always
commands:
- apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev
- curl -o /usr/bin/sccache https://assets.paulbsd.com/sccache_linux_amd64
- chmod +x /usr/bin/sccache
- cargo build --verbose --all
- cargo test --verbose --all
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes:
- name: cargo
path: /usr/local/cargo/registry
@ -33,19 +26,12 @@ steps:
- tag
- name: release
image: rust:1
pull: always
commands:
- apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev
- curl -o /usr/bin/sccache https://assets.paulbsd.com/sccache_linux_amd64
- chmod +x /usr/bin/sccache
- cargo build --release --verbose --all
- cd target/release
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes:
- name: cargo
path: /usr/local/cargo/registry
@ -87,20 +73,13 @@ platform:
arch: arm64
steps:
- name: build and test
- name: test and build
image: rust:1
pull: always
commands:
- apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev
- curl -o /usr/bin/sccache https://assets.paulbsd.com/sccache_linux_arm64
- chmod +x /usr/bin/sccache
- cargo build --verbose --all
- cargo test --verbose --all
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes:
- name: cargo
path: /usr/local/cargo/registry
@ -112,19 +91,12 @@ steps:
- tag
- name: release
image: rust:1
pull: always
commands:
- apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev
- curl -o /usr/bin/sccache https://assets.paulbsd.com/sccache_linux_arm64
- chmod +x /usr/bin/sccache
- cargo build --release --verbose --all
- cd target/release
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes:
- name: cargo
path: /usr/local/cargo/registry

1054
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package]
name = "ipblc"
version = "1.7.0"
version = "1.2.1"
edition = "2021"
authors = ["PaulBSD <paul@paulbsd.com>"]
description = "ipblc is a tool that search and send attacking ip addresses to ipbl"
@ -10,20 +10,19 @@ repository = "https://git.paulbsd.com/paulbsd/ipblc"
[dependencies]
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["string"] }
clap = { version = "4.2", features = ["string"] }
git-version = "0.3"
ipnet = "2.9"
ipnet = "2.7"
lazy_static = "1.4"
mnl = "0.2"
nftnl = "0.6"
nix = { version = "0.27", features = ["hostname", "inotify"] }
regex = "1.10"
nix = "0.26"
regex = "1.8"
reqwest = { version = "0.11", default-features = false, features = ["json","rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sd-notify = { version = "0.4" }
tokio = { version = "1.35", features = ["full", "sync"] }
tungstenite = { version = "0.21", features = ["handshake", "rustls-tls-native-roots"] }
tokio = { version = "1.28", features = ["full", "sync"] }
tungstenite = { version = "0.19", features = ["handshake","rustls-tls-native-roots"] }
## to optimize binary size (slow compile time)
#[profile.release]

View File

@ -2,5 +2,5 @@ FROM rustembedded/cross:aarch64-unknown-linux-musl
RUN dpkg --add-architecture arm64
RUN apt-get update
RUN apt-get install -y libnftnl-dev libmnl-dev libmnl0:arm64 libnftnl7:arm64 libmnl0:amd64 libnftnl0:arm64
RUN apt-get install -y libasound2-dev:arm64 libzmq3-dev libnftnl-dev libmnl-dev libmnl0:arm64 libnftnl7:arm64 libmnl0:amd64 libnftnl0:arm64
RUN apt-get clean

View File

@ -4,8 +4,8 @@
## Summary
ipblc is client-side intrusion prevention software working closely with ipbl
It's pub/sub features are websockets based
ipblc is a tool that search and send attacking ip addresses to ipbl
It's notification features are based on zeromq
## Howto
@ -48,7 +48,6 @@ Options:
- ✅ Local bound tcp api socket
- ✅ ZMQ -> Websocket
- ✅ Bug in RwLocks (agent often give up)
- ❌ Create memory friendly structs for ipdata
### Notes
@ -58,7 +57,7 @@ See [here](NOTES.md)
## License
```text
Copyright (c) 2022, 2023 PaulBSD
Copyright (c) 2021, 2022, 2023 PaulBSD
All rights reserved.
Redistribution and use in source and binary forms, with or without

View File

@ -6,7 +6,7 @@ use chrono::Duration;
use clap::{Arg, ArgAction, ArgMatches, Command};
use git_version::git_version;
use ipnet::IpNet;
use nix::sys::inotify::{AddWatchFlags, Inotify, WatchDescriptor};
use nix::sys::inotify::{AddWatchFlags, InitFlags, Inotify, WatchDescriptor};
use regex::Regex;
use reqwest::{Client, Error as ReqError, Response};
use serde::{Deserialize, Serialize};
@ -17,18 +17,18 @@ use std::path::Path;
pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]);
const MASTERSERVER: &str = "ipbl.paulbsd.com";
const WSSUBSCRIPTION: &str = "ipbl";
const CONFIG_RETRY_INTERVAL: u64 = 2;
const CONFIG_RETRY: u64 = 1;
const WEB_CLIENT_TIMEOUT: i64 = 5;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Context {
pub blocklist: HashMap<String, BlockIpData>,
pub cfg: Config,
pub discovery: Discovery,
pub flags: Flags,
pub instance: Box<Inotify>,
pub sas: HashMap<String, SetMap>,
pub hashwd: HashMap<String, WatchDescriptor>,
pub reloadinterval: isize,
}
#[derive(Debug, Clone)]
@ -36,7 +36,7 @@ pub struct SetMap {
pub filename: String,
pub fullpath: String,
pub regex: Regex,
pub set: SetCfg,
pub set: Set,
pub watchedfiles: HashMap<String, u64>,
pub wd: WatchDescriptor,
}
@ -48,7 +48,7 @@ pub struct Flags {
}
impl Context {
pub async fn new(inotify: &Inotify) -> Self {
pub async fn new() -> Self {
// Get flags
let argp: ArgMatches = Context::argparse();
let debug: bool = argp.get_one::<bool>("debug").unwrap().to_owned();
@ -63,13 +63,13 @@ impl Context {
urls: HashMap::new(),
},
sas: HashMap::new(),
instance: Box::new(Inotify::init(InitFlags::empty()).unwrap()),
blocklist: HashMap::new(),
hashwd: HashMap::new(),
reloadinterval: 5,
};
print!("Loading config ... ");
ctx.load(&inotify).await.unwrap();
ctx.load().await.unwrap();
ctx
}
@ -103,17 +103,17 @@ impl Context {
.send()
.await;
let req = match resp {
Ok(o) => o,
Err(e) => return Err(e),
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Discovery = match req.json().await {
Ok(o) => o,
Err(e) => return Err(e),
Ok(res) => res,
Err(err) => return Err(err),
};
Ok(data)
}
pub async fn load(&mut self, inotify: &Inotify) -> Result<(), Box<dyn std::error::Error>> {
pub async fn load(&mut self) -> Result<(), Box<dyn std::error::Error>> {
if cfg!(test) {
return Ok(());
}
@ -127,17 +127,17 @@ impl Context {
}
break;
}
Err(e) => {
println!("error loading config: {e}, retrying in {CONFIG_RETRY_INTERVAL}s");
Err(err) => {
println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs");
last_in_err = true;
sleep_s(CONFIG_RETRY_INTERVAL).await;
sleep_s(CONFIG_RETRY).await;
}
};
}
if last_in_err {
println!("creating sas");
}
self.create_sas(&inotify).await?;
self.create_sas().await?;
if last_in_err {
println!("created sas");
}
@ -153,9 +153,9 @@ impl Context {
res
}
pub async fn get_blocklist_toblock(&self) -> Vec<IpData> {
pub async fn get_blocklist_toblock(&mut self) -> Vec<IpData> {
let mut res: Vec<IpData> = vec![];
for (_, block) in self.blocklist.iter() {
for (_, block) in self.blocklist.iter_mut() {
match self.cfg.sets.get(&block.ipdata.src) {
Some(set) => {
if block.tryfail >= set.tryfail {
@ -169,19 +169,18 @@ impl Context {
}
pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> {
match &ipevent.ipdata {
Some(ipdata) => match self.cfg.sets.get(&ipdata.src) {
match self.cfg.sets.get(&ipevent.ipdata.src) {
Some(set) => {
let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str())
let starttime = DateTime::parse_from_rfc3339(ipevent.ipdata.date.as_str())
.unwrap()
.with_timezone(&chrono::Local);
let blocktime = set.blocktime;
if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
let block =
self.blocklist
.entry(ipdata.ip.to_string())
let block = self
.blocklist
.entry(ipevent.ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipdata.clone(),
ipdata: ipevent.ipdata.clone(),
tryfail: 0,
starttime,
blocktime,
@ -193,9 +192,9 @@ impl Context {
}
} else {
self.blocklist
.entry(ipdata.ip.to_string())
.entry(ipevent.ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipdata.clone(),
ipdata: ipevent.ipdata.clone(),
tryfail: set.tryfail,
starttime,
blocktime,
@ -203,8 +202,6 @@ impl Context {
}
}
None => {}
},
None => {}
}
None
}
@ -231,17 +228,15 @@ impl Context {
removed
}
pub async fn create_sas(
&mut self,
inotify: &Inotify,
) -> Result<(), Box<dyn std::error::Error>> {
pub async fn create_sas(&mut self) -> Result<(), Box<dyn std::error::Error>> {
for (src, set) in self.cfg.sets.iter() {
let p = Path::new(set.path.as_str());
if p.is_dir() {
let wd = match self.hashwd.get(&set.path.to_string()) {
Some(wd) => *wd,
None => {
let res = inotify
let res = self
.instance
.add_watch(set.path.as_str(), AddWatchFlags::IN_MODIFY)
.unwrap();
self.hashwd.insert(set.path.to_string(), res);
@ -287,7 +282,7 @@ impl Context {
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Config {
pub sets: HashMap<String, SetCfg>,
pub sets: HashMap<String, Set>,
#[serde(skip_serializing)]
pub trustnets: Vec<String>,
pub ws: HashMap<String, WebSocketCfg>,
@ -299,7 +294,7 @@ impl Config {
Self {
sets: HashMap::from([
("smtp".to_string(),
SetCfg {
Set {
src: "smtp".to_string(),
filename: "mail.log".to_string(),
regex: "(SASL LOGIN authentication failed)".to_string(),
@ -308,7 +303,7 @@ impl Config {
tryfail: 5,
}),
("ssh".to_string(),
SetCfg {
Set {
src: "ssh".to_string(),
filename: "auth.log".to_string(),
regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(),
@ -317,7 +312,7 @@ impl Config {
tryfail: 5,
},),
("http".to_string(),
SetCfg {
Set {
src: "http".to_string(),
filename: "".to_string(),
regex: "(anonymousfox.co)".to_string(),
@ -326,7 +321,7 @@ impl Config {
tryfail: 5,
},),
("openvpn".to_string(),
SetCfg {
Set {
src: "openvpn".to_string(),
filename: "status".to_string(),
regex: "(UNDEF)".to_string(),
@ -355,61 +350,97 @@ impl Config {
}
pub async fn load(&mut self, server: &String) -> Result<(), ReqError> {
self.get_config(server).await?;
self.get_global_config(server).await?;
self.get_trustnets(server).await?;
self.get_sets(server).await?;
self.get_ws_config(server).await?;
Ok(())
}
async fn get_config(&mut self, server: &String) -> Result<(), ReqError> {
async fn get_global_config(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> =
httpclient().get(format!("{server}/config")).send().await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: HashMap<String, GlobalConfig> = match req.json::<Vec<GlobalConfig>>().await {
Ok(res) => {
let mut out: HashMap<String, GlobalConfig> = HashMap::new();
res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.key.to_string(), x);
});
out
}
Err(err) => return Err(err),
};
let key = "".to_string();
self.api = data
.get(&key.to_string())
.unwrap_or(&GlobalConfig {
key: "api".to_string(),
value: "127.0.0.1:8060".to_string(),
})
.value
.clone();
Ok(())
}
async fn get_trustnets(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = httpclient()
.get(format!("{server}/config?v=2"))
.get(format!("{server}/config/trustlist"))
.send()
.await;
let req = match resp {
Ok(o) => o,
Err(e) => return Err(e),
Ok(re) => re,
Err(err) => return Err(err),
};
let data: GlobalConfigV2 = match req.json::<GlobalConfigV2>().await {
Ok(o) => o,
Err(e) => return Err(e),
let data: Vec<String> = match req.json::<Vec<String>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
self.trustnets = data;
Ok(())
}
for d in data.sets {
async fn get_sets(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = httpclient()
.get(format!("{server}/config/sets"))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<Set> = match req.json::<Vec<Set>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
for d in data {
self.sets.insert(d.src.clone(), d);
}
self.trustnets = data.trustlists;
data.ws.into_iter().map(|x| x).for_each(|x| {
self.ws.insert(x.t.to_string(), x);
});
self.api = data
.cfg
.get(&"api".to_string())
.unwrap_or(&self.api)
.clone();
Ok(())
}
pub async fn _get_last(server: &String) -> Result<Vec<IpData>, ReqError> {
let resp = httpclient()
.get(format!("{server}/ips/last"))
.query(&[("interval", "3 hours")])
.send()
.await;
async fn get_ws_config(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> =
httpclient().get(format!("{server}/config/ws")).send().await;
let req = match resp {
Ok(o) => o,
Err(e) => return Err(e),
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
Ok(o) => o,
Err(e) => return Err(e),
let data: HashMap<String, WebSocketCfg> = match req.json::<Vec<WebSocketCfg>>().await {
Ok(res) => {
let mut out: HashMap<String, WebSocketCfg> = HashMap::new();
res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.t.to_string(), x);
});
out
}
Err(err) => return Err(err),
};
Ok(data)
self.ws = data;
Ok(())
}
pub fn build_trustnets(&self) -> Vec<IpNet> {
@ -417,8 +448,8 @@ impl Config {
for trustnet in &self.trustnets {
match trustnet.parse() {
Ok(net) => trustnets.push(net),
Err(e) => {
println!("error parsing {trustnet}, error: {e}");
Err(err) => {
println!("error parsing {trustnet}, error: {err}");
}
};
}
@ -430,7 +461,12 @@ impl Config {
msgtype: String::from("bootstrap"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: None,
ipdata: IpData {
ip: "".to_string(),
src: "".to_string(),
date: "".to_string(),
hostname: "".to_string(),
},
}
}
}
@ -451,15 +487,13 @@ pub fn httpclient() -> Client {
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct GlobalConfigV2 {
pub cfg: HashMap<String, String>,
pub sets: Vec<SetCfg>,
pub trustlists: Vec<String>,
pub ws: Vec<WebSocketCfg>,
pub struct GlobalConfig {
pub key: String,
pub value: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct SetCfg {
pub struct Set {
pub src: String,
pub filename: String,
pub regex: String,
@ -488,13 +522,13 @@ pub struct URL {
pub path: String,
}
impl PartialEq for SetCfg {
impl PartialEq for Set {
fn eq(&self, other: &Self) -> bool {
self.src == other.src
}
}
impl Hash for SetCfg {
impl Hash for Set {
fn hash<H: Hasher>(&self, state: &mut H) {
self.src.hash(state);
}
@ -504,12 +538,10 @@ impl Hash for SetCfg {
mod test {
use super::*;
use crate::ip::*;
use nix::sys::inotify::InitFlags;
use Context;
pub async fn prepare_test_data() -> Context {
let inotify = Inotify::init(InitFlags::empty()).unwrap();
let mut ctx = Context::new(&inotify).await;
let mut ctx = Context::new().await;
let now: DateTime<Local> = Local::now().trunc_subsecs(0);
ctx.blocklist = HashMap::new();
@ -518,13 +550,12 @@ mod test {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ipdata: IpData {
ip: "1.1.1.1".to_string(),
hostname: "test1".to_string(),
date: now.to_rfc3339().to_string(),
src: "ssh".to_string(),
}),
},
})
.await;
}
@ -534,13 +565,12 @@ mod test {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ipdata: IpData {
ip: "1.1.1.2".to_string(),
hostname: "test2".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
},
})
.await;
}
@ -549,13 +579,12 @@ mod test {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ipdata: IpData {
ip: "1.1.1.3".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
},
})
.await;
@ -563,13 +592,12 @@ mod test {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ipdata: IpData {
ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
},
})
.await;
@ -577,33 +605,19 @@ mod test {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ipdata: IpData {
ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
})
.await;
ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 6,
ip: "2a00:1450:4007:805::2003".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
},
})
.await;
let ip1 = ctx.blocklist.get_mut(&"1.1.1.1".to_string()).unwrap();
let mut ip1 = ctx.blocklist.get_mut(&"1.1.1.1".to_string()).unwrap();
ip1.starttime = DateTime::from(now) - Duration::minutes(61);
let ip2 = ctx.blocklist.get_mut(&"1.1.1.2".to_string()).unwrap();
let mut ip2 = ctx.blocklist.get_mut(&"1.1.1.2".to_string()).unwrap();
ip2.starttime = DateTime::from(now) - Duration::minutes(62);
ctx
}
@ -613,14 +627,8 @@ mod test {
let ctx = prepare_test_data().await;
let pending = ctx.get_blocklist_pending().await;
assert_eq!(pending.len(), 5);
let ips = [
"1.1.1.1",
"1.1.1.2",
"1.1.1.3",
"1.1.1.4",
"2a00:1450:4007:805::2003",
];
assert_eq!(pending.len(), 4);
let ips = ["1.1.1.1", "1.1.1.2", "1.1.1.3", "1.1.1.4"];
for i in ips {
let ip = ctx
.blocklist
@ -638,7 +646,7 @@ mod test {
let mut ctx = prepare_test_data().await;
ctx.gc_blocklist().await;
let toblock = ctx.get_blocklist_toblock().await;
assert_eq!(toblock.len(), 3);
assert_eq!(toblock.len(), 2);
}
#[tokio::test]

159
src/fw.rs
View File

@ -2,79 +2,13 @@ use crate::ip::IpData;
use crate::ipblc::PKG_NAME;
use nftnl::{nft_expr, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table};
use std::{
ffi::CString,
io::Error,
net::{Ipv4Addr, Ipv6Addr},
};
use std::{ffi::CString, io::Error, net::Ipv4Addr};
pub enum FwTableType {
IPv4,
IPv6,
}
pub fn fwglobalinit<'a>() -> ((Batch, Table), (Batch, Table)) {
let (batch4, table4) = fwinit(FwTableType::IPv4);
let (batch6, table6) = fwinit(FwTableType::IPv6);
((batch4, table4), (batch6, table6))
}
macro_rules! initrules {
($batch:expr, $table:expr, $chain:ident) => {
$chain.set_hook(nftnl::Hook::In, 1);
$chain.set_policy(nftnl::Policy::Accept);
$batch.add(&$chain, nftnl::MsgType::Add);
$batch.add(&Rule::new(&$chain), nftnl::MsgType::Del);
let mut rule = Rule::new(&$chain);
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict accept));
$batch.add(&rule, nftnl::MsgType::Add);
};
}
macro_rules! createrules {
($ipdata:ident, $chain:ident, $batch:ident, $t:ty, $ip_t:ident) => {
let mut rule = Rule::new(&$chain);
let ip = $ipdata.ip.parse::<$t>().unwrap();
rule.add_expr(&nft_expr!(payload $ip_t saddr));
rule.add_expr(&nft_expr!(cmp == ip));
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 10u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict drop));
$batch.add(&rule, nftnl::MsgType::Add);
}
}
fn fwinit(t: FwTableType) -> (Batch, Table) {
let table_name: String;
let table: Table;
match t {
FwTableType::IPv4 => {
table_name = format!("{PKG_NAME}4");
table = Table::new(
&CString::new(format!("{table_name}")).unwrap(),
ProtoFamily::Ipv4,
);
}
FwTableType::IPv6 => {
table_name = format!("{PKG_NAME}6");
table = Table::new(
&CString::new(format!("{table_name}")).unwrap(),
ProtoFamily::Ipv6,
);
}
}
pub fn fwinit() -> (Batch, Table) {
let mut batch = Batch::new();
let table = Table::new(&CString::new(PKG_NAME).unwrap(), ProtoFamily::Ipv4);
batch.add(&table, nftnl::MsgType::Add);
batch.add(&table, nftnl::MsgType::Del);
@ -83,62 +17,53 @@ fn fwinit(t: FwTableType) -> (Batch, Table) {
}
pub fn fwblock(
ips_add_all: &Vec<IpData>,
ips_add: &Vec<IpData>,
ret: &mut Vec<String>,
fwlen: &mut usize,
) -> std::result::Result<(), Error> {
let ((mut batch4, table4), (mut batch6, table6)) = fwglobalinit();
// convert chain
let ips_add = convert(ips_add);
let (mut batch, table) = fwinit();
let mut chain4 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table4);
let mut chain6 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table6);
// build chain
let mut chain = Chain::new(&CString::new(PKG_NAME).unwrap(), &table);
chain.set_hook(nftnl::Hook::In, 1);
chain.set_policy(nftnl::Policy::Accept);
initrules!(batch4, table4, chain4);
initrules!(batch6, table6, chain6);
// add chain
batch.add(&chain, nftnl::MsgType::Add);
let mut factor = 1;
if ips_add_all.len() > 100 {
factor = (ips_add_all.len() / 10) as usize
}
let rule = Rule::new(&chain);
batch.add(&rule, nftnl::MsgType::Del);
let ips_add_tmp: Vec<IpData> = ips_add_all.clone().iter().map(|x| x.clone()).collect();
let mut ips_add_iter = ips_add_tmp.chunks(factor);
let mut ips_add: Vec<&[IpData]> = vec![];
while let Some(x) = ips_add_iter.next() {
ips_add.push(x);
}
let mut rule = Rule::new(&chain);
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict accept));
batch.add(&rule, nftnl::MsgType::Add);
// build and add rules
for ipdata_group in ips_add.clone() {
for ipdata in ipdata_group {
match ipdata.t {
4 => {
createrules!(ipdata, chain4, batch4, Ipv4Addr, ipv4);
}
6 => {
createrules!(ipdata, chain6, batch6, Ipv6Addr, ipv6);
}
_ => {}
}
}
for ip in ips_add.clone() {
let mut rule = Rule::new(&chain);
rule.add_expr(&nft_expr!(payload ipv4 saddr));
rule.add_expr(&nft_expr!(cmp == ip));
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 10u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict drop));
batch.add(&rule, nftnl::MsgType::Add);
}
// validate and send batch
for b in [batch4, batch6] {
let bf = b.finalize();
match send_and_process(&bf) {
Ok(_) => {}
Err(e) => {
println!("error sending batch: {e}");
let finalized_batch = batch.finalize();
send_and_process(&finalized_batch)?;
if fwlen != &mut ips_add.len() {
ret.push(format!("{length} ip in firewall", length = ips_add.len()));
}
};
}
if fwlen != &mut ips_add_all.len() {
ret.push(format!(
"{length} ip in firewall",
length = ips_add_all.len()
));
}
*fwlen = ips_add_all.len();
*fwlen = ips_add.len();
Ok(())
}
@ -169,3 +94,11 @@ fn socket_recv<'a>(
Ok(None)
}
}
fn convert(input: &Vec<IpData>) -> Vec<Ipv4Addr> {
let mut output: Vec<Ipv4Addr> = vec![];
for val in input {
output.push(val.ip.parse::<Ipv4Addr>().unwrap());
}
output
}

134
src/ip.rs
View File

@ -1,10 +1,11 @@
use crate::config::httpclient;
use crate::utils::gethostname;
use chrono::offset::LocalResult;
use chrono::prelude::*;
use ipnet::IpNet;
use lazy_static::lazy_static;
use regex::Regex;
use reqwest::Error as ReqError;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
@ -22,27 +23,15 @@ pub struct IpEvent {
pub msgtype: String,
pub mode: String,
pub hostname: String,
pub ipdata: Option<IpData>,
pub ipdata: IpData,
}
#[macro_export]
macro_rules! ipevent {
($msgtype:expr,$mode:expr,$hostname:expr,$ipdata:expr) => {
IpEvent {
msgtype: String::from($msgtype),
mode: String::from($mode),
hostname: $hostname,
ipdata: $ipdata,
}
};
($msgtype:expr,$mode:expr,$hostname:expr) => {
IpEvent {
msgtype: String::from($msgtype),
mode: String::from($mode),
hostname: $hostname,
ipdata: None,
}
};
#[derive(Clone, Debug, Serialize, Deserialize, Eq)]
pub struct IpData {
pub ip: String,
pub src: String,
pub date: String,
pub hostname: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -53,28 +42,6 @@ pub struct BlockIpData {
pub starttime: DateTime<Local>,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq)]
pub struct IpData {
pub t: isize,
pub ip: String,
pub src: String,
pub date: String,
pub hostname: String,
}
#[macro_export]
macro_rules! ipdata {
($t:expr,$ip:expr,$src:expr,$date:expr,$hostname:expr) => {
IpData {
t: $t.clone(),
ip: $ip.clone(),
src: $src.clone(),
date: $date.clone(),
hostname: $hostname.clone(),
}
};
}
impl PartialEq for IpData {
fn eq(&self, other: &IpData) -> bool {
self.ip.as_bytes() == other.ip.as_bytes() && self.src == other.src
@ -110,7 +77,7 @@ impl Display for IpData {
}
pub fn filter(
reader: Box<dyn Read>,
lines: Box<dyn Read>,
iplist: &mut Vec<IpData>,
trustnets: &Vec<IpNet>,
regex: &Regex,
@ -119,23 +86,19 @@ pub fn filter(
) -> isize {
let mut ips = 0;
let hostname = gethostname(true);
let lines = BufReader::new(reader).lines();
for line in lines.into_iter() {
for line in BufReader::new(lines).lines() {
if let Ok(l) = line {
if regex.is_match(l.as_str()) {
let s_ipaddr: String;
let t: isize;
match R_IPV4.captures(l.as_str()) {
Some(sv4) => {
s_ipaddr = sv4.get(0).unwrap().as_str().to_string();
t = 4;
}
None => {
match R_IPV6.captures(l.as_str()) {
Some(sv6) => {
s_ipaddr = sv6.get(0).unwrap().as_str().to_string();
t = 6;
}
None => {
continue;
@ -144,14 +107,6 @@ pub fn filter(
}
};
let ipaddr: IpAddr = match s_ipaddr.parse() {
Ok(ip) => ip,
Err(err) => {
println!("unparseable IP: {err} {s_ipaddr}");
continue;
}
};
let s_date: DateTime<Local>;
match R_DATE.captures(l.as_str()) {
Some(sdt) => {
@ -165,8 +120,21 @@ pub fn filter(
}
};
let ipaddr: IpAddr = match s_ipaddr.parse() {
Ok(ip) => ip,
Err(err) => {
println!("unparseable IP: {err} {s_ipaddr}");
continue;
}
};
if !is_trusted(&ipaddr, &trustnets) {
iplist.push(ipdata!(t, s_ipaddr, src, s_date.to_rfc3339(), hostname));
iplist.push(IpData {
ip: s_ipaddr,
src: src.to_owned(),
date: s_date.to_rfc3339().to_owned(),
hostname: hostname.to_owned(),
});
ips += 1;
};
}
@ -176,24 +144,28 @@ pub fn filter(
}
fn parse_date(input: regex::Captures) -> DateTime<Local> {
let mut ymd: Vec<u32> = vec![];
let mut hms: Vec<u32> = vec![];
let ymd_range = 2..5;
let hms_range = 5..8;
let mut ymd: Vec<u64> = vec![];
let mut hms: Vec<u64> = vec![];
for cap in ymd_range {
ymd.push(input.get(cap).unwrap().as_str().parse::<u32>().unwrap());
let (daterange, hourrange) = (2..5, 5..8);
for i in daterange {
ymd.push(input.get(i).unwrap().as_str().parse::<u64>().unwrap());
}
for cap in hms_range {
hms.push(input.get(cap).unwrap().as_str().parse::<u32>().unwrap());
for i in hourrange {
hms.push(input.get(i).unwrap().as_str().parse::<u64>().unwrap());
}
let date: DateTime<Local> =
match Local.with_ymd_and_hms(ymd[0] as i32, ymd[1], ymd[2], hms[0], hms[1], hms[2]) {
LocalResult::Single(s) => s,
LocalResult::Ambiguous(a, _b) => a,
LocalResult::None => Local::now().trunc_subsecs(0),
};
let date = Local
.with_ymd_and_hms(
ymd[0] as i32,
ymd[1] as u32,
ymd[2] as u32,
hms[0] as u32,
hms[1] as u32,
hms[2] as u32,
)
.unwrap();
date
}
@ -205,3 +177,23 @@ fn is_trusted(ip: &IpAddr, trustnets: &Vec<IpNet>) -> bool {
}
false
}
pub async fn _get_last(server: &String) -> Result<Vec<IpData>, ReqError> {
let resp = httpclient()
.get(format!("{server}/ips/last"))
.query(&[("interval", "3 hours")])
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
Ok(data)
}

View File

@ -1,7 +1,6 @@
use crate::config::{Context, GIT_VERSION};
use crate::fw::{fwblock, fwglobalinit};
use crate::fw::{fwblock, fwinit};
use crate::ip::{filter, IpData, IpEvent};
use crate::ipevent;
use crate::monitoring::apiserver;
use crate::utils::{gethostname, read_lines, sleep_s};
use crate::webservice::send_to_ipbl_api;
@ -10,8 +9,7 @@ use crate::websocket::{send_to_ipbl_websocket, websocketpubsub, websocketreqrep}
use chrono::prelude::*;
use chrono::prelude::{DateTime, Local};
use chrono::Duration;
use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent};
use sd_notify::*;
use nix::sys::inotify::InotifyEvent;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender};
@ -22,25 +20,16 @@ const BL_CHAN_SIZE: usize = 32;
const WS_CHAN_SIZE: usize = 64;
const LOOP_MAX_WAIT: u64 = 5;
macro_rules! log_with_systemd {
($msg:expr) => {
println!("{}", $msg);
notify(false, &[NotifyState::Status(format!("{}", $msg).as_str())]).unwrap();
};
}
pub async fn run() {
let inotify = Inotify::init(InitFlags::empty()).unwrap();
let globalctx = Context::new(&inotify).await;
let globalctx = Context::new().await;
let ctxarc = Arc::new(RwLock::new(globalctx));
let mut fwlen: usize = 0;
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
log_with_systemd!(format!("Launching {}, version {}", PKG_NAME, pkgversion));
fwglobalinit();
println!("Launching {}, version {}", PKG_NAME, pkgversion);
fwinit();
let ctxapi = Arc::clone(&ctxarc);
apiserver(&ctxapi).await.unwrap();
@ -58,9 +47,7 @@ pub async fn run() {
let mut wssocketrr = websocketreqrep(&ctxwsrr).await;
// init file watcher
let inoarc = Arc::new(RwLock::new(inotify));
let inoclone = Arc::clone(&inoarc);
let mut blrx = watchfiles(inoclone).await;
let mut blrx = watchfiles(&ctxarc).await;
let ctxclone = Arc::clone(&ctxarc);
let ipeventclone = Arc::clone(&ipeventtxarc);
@ -68,8 +55,6 @@ pub async fn run() {
compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await;
});
notify(false, &[NotifyState::Ready]).unwrap();
loop {
let mut ret: Vec<String> = Vec::new();
@ -79,16 +64,22 @@ pub async fn run() {
ipevent = ipeventrx.recv() => {
let received_ip = ipevent.unwrap();
let (toblock,server) = {
let ctx = ctxclone.read().await;
(ctx.get_blocklist_toblock().await,ctx.flags.server.clone())
};
let (toblock,server);
{
let mut ctx = ctxclone.write().await;
toblock = ctx.get_blocklist_toblock().await;
server = ctx.flags.server.clone();
}
if received_ip.msgtype == "bootstrap".to_string() {
for ip_to_send in toblock {
let ipe = ipevent!("init","ws",gethostname(true),Some(ip_to_send));
let ipe = IpEvent{
msgtype: String::from("init"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: ip_to_send,
};
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
break;
}
@ -97,31 +88,32 @@ pub async fn run() {
}
// refresh context blocklist
let filtered_ipevent = {
ctxarc.write().await.update_blocklist(&received_ip).await
};
let filtered_ipevent;
{
let mut ctx = ctxarc.write().await;
filtered_ipevent = ctx.update_blocklist(&received_ip).await;
}
// send ip list to api and ws sockets
if let Some(ipevent) = filtered_ipevent {
if received_ip.msgtype != "init" {
log_with_systemd!(format!("sending {} to api and ws", ipevent.ipdata.clone().unwrap().ip));
let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata);
println!("sending {} to api and ws", ipevent.ipdata.ip);
let ipe = IpEvent{
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: ipevent.ipdata,
};
send_to_ipbl_api(&server.clone(), &ipe).await;
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
if !status {
wssocketrr = websocketreqrep(&ctxwsrr).await;
continue;
}
}
}
}
_val = sleep_s(LOOP_MAX_WAIT) => {
let ipe = ipevent!("ping", "ws", gethostname(true));
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
}
}
_val = sleep_s(LOOP_MAX_WAIT) => {}
};
let ctxclone = Arc::clone(&ctxarc);
@ -129,43 +121,19 @@ pub async fn run() {
// log lines
if ret.len() > 0 {
let result = ret.join(", ");
log_with_systemd!(format!("{result}"));
println!("{ret}", ret = ret.join(", "));
}
let ctxclone = Arc::clone(&ctxarc);
let inoclone = Arc::clone(&inoarc);
handle_cfg_reload(&ctxclone, &mut last_cfg_reload, inoclone).await;
handle_cfg_reload(&ctxclone, &mut last_cfg_reload).await;
}
}
async fn handle_cfg_reload(
ctxclone: &Arc<RwLock<Context>>,
last_cfg_reload: &mut DateTime<Local>,
inoarc: Arc<RwLock<Inotify>>,
) {
async fn handle_cfg_reload(ctxclone: &Arc<RwLock<Context>>, last_cfg_reload: &mut DateTime<Local>) {
let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) {
let inotify;
loop {
inotify = match inoarc.try_read() {
Ok(o) => o,
Err(e) => {
println!("{e}");
sleep_s(1).await;
continue;
}
};
break;
}
let mut ctxtest = match ctxclone.try_write() {
Ok(o) => o,
Err(e) => {
println!("{e}");
return;
}
};
match ctxtest.load(&inotify).await {
let mut ctx = ctxclone.write().await;
match ctx.load().await {
Ok(_) => {
*last_cfg_reload = Local::now().trunc_subsecs(0);
}
@ -177,29 +145,34 @@ async fn handle_cfg_reload(
}
async fn handle_fwblock(ctxclone: Arc<RwLock<Context>>, ret: &mut Vec<String>, fwlen: &mut usize) {
{
let toblock = {
let mut ctx = ctxclone.write().await;
ctx.gc_blocklist().await;
}
let toblock = {
let ctx = ctxclone.read().await;
ctx.get_blocklist_toblock().await
};
// apply firewall blocking
match fwblock(&toblock, ret, fwlen) {
Ok(_) => {}
Err(e) => {
println!("err: {e}, unable to push firewall rules, use super user")
Err(err) => {
println!("Err: {err}, unable to push firewall rules, use super user")
}
};
}
async fn watchfiles(inoarc: Arc<RwLock<Inotify>>) -> Receiver<FileEvent> {
async fn watchfiles(ctxarc: &Arc<RwLock<Context>>) -> Receiver<FileEvent> {
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);
let ctxclone = Arc::clone(ctxarc);
tokio::spawn(async move {
loop {
let events = inoarc.read().await.read_events().unwrap();
let events;
let instance;
{
let ctx = ctxclone.read().await;
instance = ctx.instance.clone();
}
events = instance.read_events().unwrap();
for inevent in events {
let date: DateTime<Local> = Local::now().trunc_subsecs(0);
@ -212,12 +185,12 @@ async fn watchfiles(inoarc: Arc<RwLock<Inotify>>) -> Receiver<FileEvent> {
async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, bool) {
let currentlen = match std::fs::metadata(&path.to_string()) {
Ok(u) => u.len(),
Ok(u) => u.len().clone(),
Err(_) => 0u64,
};
let lastlen = match w.insert(path.to_string(), currentlen) {
Some(u) => u,
None => currentlen,
None => 0u64,
};
(lastlen, lastlen != currentlen)
}
@ -232,16 +205,20 @@ async fn compare_files_changes(
let modfiles = inrx.recv().await.unwrap();
let mut iplist: Vec<IpData> = vec![];
let sas = {
let sask;
let sas;
{
let ctx = ctxarc.read().await;
sas = ctx.clone().sas;
sask = sas.keys();
tnets = ctx.cfg.build_trustnets();
ctx.sas.clone()
};
}
match modfiles.inevent.name {
Some(name) => {
let filename = name.to_str().unwrap();
for (sak, sa) in sas.clone().iter_mut() {
for sak in sask {
let sa = sas.get(sak).unwrap();
if modfiles.inevent.wd == sa.wd {
let handle: String;
if sa.filename.as_str() == "" {
@ -252,11 +229,13 @@ async fn compare_files_changes(
continue;
}
let (filesize, sizechanged) = {
let (filesize, sizechanged);
{
let mut ctx = ctxarc.write().await;
let sa = ctx.sas.get_mut(sak).unwrap();
get_last_file_size(&mut sa.watchedfiles, &handle).await
};
(filesize, sizechanged) =
get_last_file_size(&mut sa.watchedfiles, &handle).await;
}
if !sizechanged {
continue;
@ -279,9 +258,14 @@ async fn compare_files_changes(
}
}
for ip in iplist {
let ipe = ipevent!("add", "file", gethostname(true), Some(ip));
let ipetx = ipeventtx.read().await;
ipetx.send(ipe).await.unwrap();
let ipevent = IpEvent {
msgtype: String::from("add"),
hostname: gethostname(true),
mode: String::from("file"),
ipdata: ip,
};
let ipetx = ipeventtx.write().await;
ipetx.send(ipevent).await.unwrap();
}
}
None => {}

View File

@ -4,72 +4,54 @@ use serde_json;
use std::io;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpSocket;
use tokio::sync::RwLock;
pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let ctxarc = ctxarc.clone();
let addr: String = { ctxarc.read().await.cfg.api.parse().unwrap() };
let listener = match TcpListener::bind(addr).await {
Ok(o) => o,
Err(e) => {
println!("error: {e}");
std::process::exit(1);
let addr;
{
let ctx = ctxarc.read().await;
addr = ctx.cfg.api.parse().unwrap();
}
};
let socket = TcpSocket::new_v4().unwrap();
socket.bind(addr).unwrap();
socket.set_reuseaddr(true).unwrap();
let listener = socket.listen(1024).unwrap();
tokio::spawn(async move {
loop {
//apitx.send(String::from("")).await.unwrap();
match listener.accept().await {
Ok((mut socket, _addr)) => {
let mut buf = vec![0; 1024];
Ok((stream, _addr)) => {
//let mut buf = [0; 1024];
let data;
{
let ctx = ctxarc.read().await;
data = serde_json::to_string(&ctx.blocklist);
}
match socket.readable().await {
Ok(_) => {
match socket.try_read(&mut buf) {
match data {
Ok(dt) => {
let (_reader, mut writer) = stream.into_split();
match writer.write_all(format!("{dt}").as_bytes()).await {
Ok(_) => {}
Err(e) => {
println!("error: {e}");
continue;
}
};
}
Err(e) => {
println!("error: {e}");
continue;
}
}
let msg = match String::from_utf8(buf.to_vec()) {
Ok(o) => o.trim_matches(char::from(0)).trim().to_string(),
Err(_) => "".to_string(),
};
let res = format_result(&ctxarc, msg.as_str()).await;
match socket.write_all(res.as_bytes()).await {
Ok(_) => {}
Err(e) => {
println!("error: {e}");
Err(err) => {
println!("{err}");
}
}
}
Err(err) => {
println!("error: {err}");
println!("unable to serialize data: {err}");
}
}
}
Err(err) => {
println!("couldn't get client: {}", err)
}
}
}
});
Ok(())
}
async fn format_result(ctxarc: &Arc<RwLock<Context>>, mode: &str) -> String {
let data;
let ctx = ctxarc.read().await;
match mode {
"cfg" => data = serde_json::to_string(&ctx.cfg).unwrap(),
"blocklist" => data = serde_json::to_string(&ctx.blocklist).unwrap(),
_ => data = serde_json::to_string(&ctx.blocklist).unwrap(),
};
data
}

View File

@ -1,28 +0,0 @@
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}
pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
// Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
}
pub async fn _sleep_ms(ms: u64) {
sleep(Duration::from_millis(ms)).await;
}

View File

@ -1 +1 @@
(((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*)|(((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?)
((^\s*((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*$)|(^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$))

View File

@ -1,14 +1,21 @@
use lazy_static::lazy_static;
use nix::unistd;
use regex::Regex;
use std::boxed::Box;
use std::fs::File;
use std::io::*;
use std::path::Path;
use tokio::time::{sleep, Duration};
lazy_static! {
static ref R_FILE_GZIP: Regex = Regex::new(r".*\.gz.*").unwrap();
}
pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
let mut file = match File::open(filename) {
Ok(o) => o,
Err(e) => {
println!("error: {e}");
Ok(f) => f,
Err(err) => {
println!("{err}");
return None;
}
};
@ -17,13 +24,21 @@ pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
Some(lines)
}
pub async fn sleep_s(s: u64) {
sleep(Duration::from_secs(s)).await;
pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
// Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
}
#[allow(dead_code)]
pub async fn sleep_ms(m: u64) {
sleep(Duration::from_millis(m)).await;
pub async fn _sleep_ms(ms: u64) {
sleep(Duration::from_millis(ms)).await;
}
pub async fn sleep_s(s: u64) {
sleep(Duration::from_secs(s)).await;
}
pub fn gethostname(show_fqdn: bool) -> String {
@ -38,3 +53,19 @@ pub fn gethostname(show_fqdn: bool) -> String {
}
hostname[0].to_string()
}
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}

View File

@ -11,12 +11,12 @@ pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
let mut try_req = 0;
let client = httpclient();
loop {
match push_ip(&client, &server, &ip.ipdata.clone().unwrap()).await {
match push_ip(&client, &server, &ip.ipdata).await {
Ok(_) => {
break;
}
Err(e) => {
println!("error: {e}");
Err(err) => {
println!("{err}");
sleep_s(1).await;
if try_req == MAX_FAILED_API_RATE {
break;
@ -31,7 +31,6 @@ async fn push_ip(client: &Client, server: &str, ip: &IpData) -> Result<(), ReqEr
let mut data: Vec<IpData> = vec![];
data.push(IpData {
t: ip.t,
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
@ -57,7 +56,6 @@ async fn _push_ip_bulk(
for ip in ips {
data.push(IpData {
t: ip.t,
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),

View File

@ -14,13 +14,13 @@ use tungstenite::*;
pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event, wscfg);
let (mut wssocketrr, bootstrap_event, cfg);
{
let ctx = ctxarc.read().await;
bootstrap_event = ctx.cfg.bootstrap_event().clone();
wscfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
cfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
}
wssocketrr = websocketconnect(&wscfg, &gethostname(true)).await.unwrap();
wssocketrr = websocketconnect(&cfg, &gethostname(true)).await.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
return wssocketrr;
@ -41,30 +41,21 @@ pub async fn websocketpubsub(
tokio::spawn(async move {
loop {
let mut ws = websocket.write().await;
match ws.read() {
match ws.read_message() {
Ok(msg) => {
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
Ok(o) => o,
Err(e) => {
println!("error in pubsub: {e:?}");
Err(_e) => {
continue;
}
};
match tosend.ipdata.clone() {
Some(o) => {
if o.hostname != gethostname(true)
if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
let txps = txpubsub.read().await;
let txps = txpubsub.write().await;
txps.send(tosend).await.unwrap();
}
}
None => {
let txps = txpubsub.read().await;
txps.send(tosend.clone()).await.unwrap();
}
}
}
Err(e) => {
println!("error in pubsub: {e:?}");
ws.close(None).unwrap();
@ -99,7 +90,9 @@ pub async fn websocketconnect<'a>(
}
println!("connected to {endpoint}");
let msg = json!({ "hostname": hostname });
socket.send(Message::Text(msg.to_string())).unwrap();
socket
.write_message(Message::Text(msg.to_string()))
.unwrap();
Ok(socket)
}
@ -110,30 +103,32 @@ pub async fn send_to_ipbl_websocket(
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
if ws.can_write() {
match ws.send(Message::Text(msg)) {
match ws.write_message(Message::Text(msg)) {
Ok(_) => {}
Err(e) => {
println!("err send read: {e:?}");
return false;
return handle_websocket_error(ws);
}
};
} else {
println!("can't write to socket");
return false;
return handle_websocket_error(ws);
};
if ws.can_read() {
match ws.read() {
match ws.read_message() {
Ok(_) => {}
Err(e) => {
println!("err send read: {e:?}");
return false;
return handle_websocket_error(ws);
}
};
} else {
println!("can't read from socket");
return false;
return handle_websocket_error(ws);
};
true
}
fn handle_websocket_error(ws: &mut WebSocket<MaybeTlsStream<TcpStream>>) -> bool {
ws.close(None).unwrap();
return false;
}