Compare commits

..

No commits in common. "master" and "1.1.2-dev" have entirely different histories.

19 changed files with 1313 additions and 2148 deletions

View File

@ -8,20 +8,13 @@ platform:
arch: amd64 arch: amd64
steps: steps:
- name: build and test - name: test and build
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev libclang-dev - apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev
- curl -o $${RUSTC_WRAPPER} https://assets.paulbsd.com/sccache_linux_${DRONE_STAGE_ARCH} - cargo build --verbose --all
- chmod +x $${RUSTC_WRAPPER} - cargo test --verbose --all
- cargo b -v
- cargo t -v
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -33,19 +26,12 @@ steps:
- tag - tag
- name: release - name: release
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev libclang-dev - apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev
- curl -o $${RUSTC_WRAPPER} https://assets.paulbsd.com/sccache_linux_${DRONE_STAGE_ARCH} - cargo build --release --verbose --all
- chmod +x $${RUSTC_WRAPPER}
- cargo b -r -v
- cd target/release - cd target/release
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc - tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -61,6 +47,9 @@ steps:
api_key: api_key:
from_secret: gitea_token from_secret: gitea_token
files: "target/release/*.tar.gz" files: "target/release/*.tar.gz"
checksum:
- sha256
- sha512
environment: environment:
PLUGIN_TITLE: "" PLUGIN_TITLE: ""
when: when:
@ -84,20 +73,13 @@ platform:
arch: arm64 arch: arm64
steps: steps:
- name: build and test - name: test and build
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev libclang-dev - apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev
- curl -o $${RUSTC_WRAPPER} https://assets.paulbsd.com/sccache_linux_${DRONE_STAGE_ARCH} - cargo build --verbose --all
- chmod +x $${RUSTC_WRAPPER} - cargo test --verbose --all
- cargo b -v
- cargo t -v
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -109,19 +91,12 @@ steps:
- tag - tag
- name: release - name: release
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev libclang-dev - apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev
- curl -o $${RUSTC_WRAPPER} https://assets.paulbsd.com/sccache_linux_${DRONE_STAGE_ARCH} - cargo build --release --verbose --all
- chmod +x $${RUSTC_WRAPPER}
- cargo b -r -v
- cd target/release - cd target/release
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc - tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -137,6 +112,9 @@ steps:
api_key: api_key:
from_secret: gitea_token from_secret: gitea_token
files: "target/release/*.tar.gz" files: "target/release/*.tar.gz"
checksum:
- sha256
- sha512
environment: environment:
PLUGIN_TITLE: "" PLUGIN_TITLE: ""
when: when:

1904
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package] [package]
name = "ipblc" name = "ipblc"
version = "1.8.1" version = "1.1.0"
edition = "2021" edition = "2021"
authors = ["PaulBSD <paul@paulbsd.com>"] authors = ["PaulBSD <paul@paulbsd.com>"]
description = "ipblc is a tool that search and send attacking ip addresses to ipbl" description = "ipblc is a tool that search and send attacking ip addresses to ipbl"
@ -10,20 +10,19 @@ repository = "https://git.paulbsd.com/paulbsd/ipblc"
[dependencies] [dependencies]
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.5", features = ["string"] } clap = { version = "4.2", features = ["string"] }
git-version = "0.3" git-version = "0.3"
ipnet = "2.11" ipnet = "2.7"
lazy_static = "1.5" lazy_static = "1.4"
nix = { version = "0.30", features = ["hostname", "inotify"] } mnl = "0.2"
regex = "1.11" nftnl = "0.6"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } nix = "0.26"
rustables = "0.8.6" regex = "1.8"
rustables-macros = "0.1.2" reqwest = { version = "0.11", default-features = false, features = ["json","rustls-tls"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sd-notify = { version = "0.4" } tokio = { version = "1.28", features = ["full", "sync"] }
tokio = { version = "1.45", features = ["full", "sync"] } tungstenite = { version = "0.19", features = ["handshake","rustls-tls-native-roots"] }
tungstenite = { version = "0.26", features = ["handshake", "rustls-tls-native-roots"] }
## to optimize binary size (slow compile time) ## to optimize binary size (slow compile time)
#[profile.release] #[profile.release]

View File

@ -2,5 +2,5 @@ FROM rustembedded/cross:aarch64-unknown-linux-musl
RUN dpkg --add-architecture arm64 RUN dpkg --add-architecture arm64
RUN apt-get update RUN apt-get update
RUN apt-get install -y libnftnl-dev libmnl-dev libmnl0:arm64 libnftnl7:arm64 libmnl0:amd64 libnftnl0:arm64 RUN apt-get install -y libasound2-dev:arm64 libzmq3-dev libnftnl-dev libmnl-dev libmnl0:arm64 libnftnl7:arm64 libmnl0:amd64 libnftnl0:arm64
RUN apt-get clean RUN apt-get clean

View File

@ -4,8 +4,8 @@
## Summary ## Summary
ipblc is client-side intrusion prevention software working closely with ipbl ipblc is a tool that search and send attacking ip addresses to ipbl
It's pub/sub features are websockets based It's notification features are based on zeromq
## Howto ## Howto
@ -47,8 +47,7 @@ Options:
- ✅ Error handing when fetching config - ✅ Error handing when fetching config
- ✅ Local bound tcp api socket - ✅ Local bound tcp api socket
- ✅ ZMQ -> Websocket - ✅ ZMQ -> Websocket
- ✅ Bug in RwLocks (agent often give up) - ❌ Bug in RwLocks (agent often give up)
- ❌ Create memory friendly structs for ipdata
### Notes ### Notes
@ -58,7 +57,7 @@ See [here](NOTES.md)
## License ## License
```text ```text
Copyright (c) 2022, 2023 PaulBSD Copyright (c) 2021, 2022, 2023 PaulBSD
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without Redistribution and use in source and binary forms, with or without

View File

@ -1,143 +0,0 @@
use rustables::*;
use rustables::{expr::*, Chain, Rule, Table};
use std::{io::*, net::*};
const TABLE_NAME: &str = "ipblc4";
const CHAIN_NAME: &str = "ipblc";
fn main() -> Result<()> {
/*let name = "blabla";
let mut batch = Batch::new();
let table = Table::new(ProtocolFamily::Ipv4).with_name(name);
batch.add(&table, MsgType::Add);
let mut chain = Chain::new(&table).with_name(name);
batch.add(&chain, MsgType::Add);
let toadd1: Ipv4Addr = "9.9.9.8".parse().unwrap();
let toadd2: Ipv4Addr = "9.9.9.1".parse().unwrap();
let mut setbuilder: SetBuilder<Ipv4Addr> = SetBuilder::new("s1", &table).unwrap();
setbuilder.add(&toadd1);
setbuilder.add(&toadd2);
let (mut set, setelem) = setbuilder.finish();
batch.add(&setelem, MsgType::Add);
//batch.add(&set, MsgType::Add);
set.family = ProtocolFamily::Ipv4;
set.id = Some(5);
set.flags = Some(0);
set.userdata = Some("test".into());
println!("{:?}", setelem);*/
let get_table = || -> Result<Option<Table>> {
let tables = list_tables().unwrap();
for table in tables {
if let Some(name) = table.get_name() {
println!("Found table {}", name);
if name == TABLE_NAME {
return Ok(Some(table));
}
}
}
Ok(None)
};
let get_chain = |table: &Table| -> Result<Option<Chain>> {
let chains = list_chains_for_table(table).unwrap();
for chain in chains {
if let Some(name) = chain.get_name() {
println!("Found chain {}", name);
if name == CHAIN_NAME {
return Ok(Some(chain));
}
}
}
Ok(None)
};
let table = get_table().unwrap().expect("no table?");
let chain = get_chain(&table).unwrap().expect("no chain?");
let ip: IpAddr = "184.73.167.217".parse().unwrap();
let cmprule = Rule::new(&chain).unwrap().saddr(ip).drop();
println!("{:?}", cmprule);
let mut gexpr = RawExpression::default();
for e in cmprule.get_expressions().unwrap().iter() {
if let Some(ExpressionVariant::Cmp(_)) = e.get_data() {
gexpr = e.clone();
}
}
let rules = list_rules_for_chain(&chain).unwrap();
for rule in rules {
let handle = rule.get_handle().unwrap();
println!("handle {}", handle);
let exprs = rule.get_expressions().unwrap();
for expr in exprs.iter() {
if let Some(ExpressionVariant::Cmp(_)) = expr.get_data() {
if expr.clone() == gexpr {
println!("{:?}", expr.get_data());
println!("test");
break;
}
}
//if expr.get_data()
//if expr.
}
}
//let mut set: Set<Ipv4Addr> = nft_set!(
// &CString::new("blabla").unwrap(),
// 32,
// &table,
// ProtoFamily::Ipv4 //ProtoFamily::Ipv4;
// //[&toadd1,&toadd2,]
//);
////println!("{:?}", set.0);
//set.add(&toadd1);
//set.add(&toadd2);
//batch.add(&set, MsgType::Add);
//let mut rule = Rule::new(&chain)
// .unwrap()
// .with_expr(
// HighLevelPayload::Network(NetworkHeaderField::IPv4(IPv4HeaderField::Saddr)).build(),
// )
// .with_expr(Lookup::new(&set).unwrap())
// .with_expr(Immediate::new_verdict(VerdictKind::Accept));
//println!("{:?}", rule);
//batch.add(&rule, rustables::MsgType::Add);
//match batch.send() {
// Ok(o) => {}
// Err(e) => {
// println!("{e}");
// }
//}
//rule.add_expr(&nft_expr!(payload ipv4 saddr));
//#[rustfmt::skip]
//rule.add_expr(&nft_expr!(lookup &set));
//rule.add_expr(&nft_expr!(ct state));
//rule.add_expr(&nft_expr!(verdict drop));
//batch.add(&rule, MsgType::Add);
//let finalized_batch = batch.finalize();
//send_and_process(&finalized_batch)?;
Ok(())
}
#[allow(dead_code)]
#[derive(Debug)]
struct Error(String);
impl<T: std::error::Error> From<T> for Error {
fn from(error: T) -> Self {
Error(error.to_string())
}
}

View File

@ -1,65 +0,0 @@
use nftnl::{nft_expr, set::Set, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table};
use std::{ffi::CString, io::*, net::Ipv4Addr};
fn main() -> std::result::Result<(), Error> {
let table_name = format!("ipblc4");
let table = Table::new(
&CString::new(format!("{table_name}")).unwrap(),
ProtoFamily::Ipv4,
);
let mut batch = Batch::new();
batch.add(&table, nftnl::MsgType::Add);
batch.add(&table, nftnl::MsgType::Del);
batch.add(&table, nftnl::MsgType::Add);
let mut chain = Chain::new(&CString::new("test").unwrap(), &table);
chain.set_hook(nftnl::Hook::In, 1);
chain.set_policy(nftnl::Policy::Accept);
batch.add(&chain, nftnl::MsgType::Add);
batch.add(&Rule::new(&chain), nftnl::MsgType::Del);
let mut rule = Rule::new(&chain);
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict accept));
batch.add(&rule, nftnl::MsgType::Add);
let finalized_batch = batch.finalize();
send_and_process(&finalized_batch)?;
Ok(())
}
fn send_and_process(batch: &FinalizedBatch) -> std::result::Result<(), Error> {
let seq: u32 = 2;
let socket = mnl::Socket::new(mnl::Bus::Netfilter)?;
socket.send_all(batch)?;
let mut buffer = vec![0; nftnl::nft_nlmsg_maxsize() as usize];
while let Some(message) = socket_recv(&socket, &mut buffer[..])? {
match mnl::cb_run(message, seq, socket.portid())? {
mnl::CbResult::Stop => {
break;
}
mnl::CbResult::Ok => (),
}
}
Ok(())
}
fn socket_recv<'a>(
socket: &mnl::Socket,
buf: &'a mut [u8],
) -> std::result::Result<Option<&'a [u8]>, Error> {
let ret = socket.recv(buf)?;
if ret > 0 {
Ok(Some(&buf[..ret]))
} else {
Ok(None)
}
}

View File

@ -1,28 +0,0 @@
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}
pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
// Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
}
pub async fn _sleep_ms(ms: u64) {
sleep(Duration::from_millis(ms)).await;
}

View File

@ -1,38 +1,34 @@
use crate::ip::{BlockIpData, IpData, IpEvent}; use crate::ip::{BlockIpData, IpData, IpEvent};
use crate::utils::{gethostname, sleep_s}; use crate::utils::{gethostname, sleep_s};
use std::{
collections::HashMap,
hash::{Hash, Hasher},
path::Path,
};
use chrono::prelude::*; use chrono::prelude::*;
use chrono::Duration; use chrono::Duration;
use clap::{Arg, ArgAction, ArgMatches, Command}; use clap::{Arg, ArgAction, ArgMatches, Command};
use git_version::git_version; use git_version::git_version;
use ipnet::IpNet; use ipnet::IpNet;
use nix::sys::inotify::{AddWatchFlags, Inotify, WatchDescriptor}; use nix::sys::inotify::{AddWatchFlags, InitFlags, Inotify, WatchDescriptor};
use regex::Regex; use regex::Regex;
use reqwest::{Client, Error as ReqError, Response}; use reqwest::{Client, Error as ReqError, Response};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::path::Path;
pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]); pub const GIT_VERSION: &str = git_version!();
const MASTERSERVER: &str = "ipbl.paulbsd.com"; const MASTERSERVER: &str = "ipbl.paulbsd.com";
const WSSUBSCRIPTION: &str = "ipbl"; const WSSUBSCRIPTION: &str = "ipbl";
const CONFIG_RETRY_INTERVAL: u64 = 2; const CONFIG_RETRY: u64 = 1;
const WEB_CLIENT_TIMEOUT: i64 = 5; const WEB_CLIENT_TIMEOUT: i64 = 5;
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct Context { pub struct Context {
pub blocklist: HashMap<String, BlockIpData>, pub blocklist: HashMap<String, BlockIpData>,
pub cfg: Config, pub cfg: Config,
pub discovery: Discovery, pub discovery: Discovery,
pub flags: Flags, pub flags: Flags,
pub instance: Box<Inotify>,
pub sas: HashMap<String, SetMap>, pub sas: HashMap<String, SetMap>,
pub hashwd: HashMap<String, WatchDescriptor>, pub hashwd: HashMap<String, WatchDescriptor>,
pub reloadinterval: u64,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -40,20 +36,19 @@ pub struct SetMap {
pub filename: String, pub filename: String,
pub fullpath: String, pub fullpath: String,
pub regex: Regex, pub regex: Regex,
pub set: SetCfg, pub set: Set,
pub watchedfiles: HashMap<String, u64>, pub watchedfiles: HashMap<String, u64>,
pub wd: WatchDescriptor, pub wd: WatchDescriptor,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Flags { pub struct Flags {
#[allow(dead_code)]
pub debug: bool, pub debug: bool,
pub server: String, pub server: String,
} }
impl Context { impl Context {
pub async fn new(inotify: &Inotify) -> Self { pub async fn new() -> Self {
// Get flags // Get flags
let argp: ArgMatches = Context::argparse(); let argp: ArgMatches = Context::argparse();
let debug: bool = argp.get_one::<bool>("debug").unwrap().to_owned(); let debug: bool = argp.get_one::<bool>("debug").unwrap().to_owned();
@ -68,13 +63,13 @@ impl Context {
urls: HashMap::new(), urls: HashMap::new(),
}, },
sas: HashMap::new(), sas: HashMap::new(),
instance: Box::new(Inotify::init(InitFlags::empty()).unwrap()),
blocklist: HashMap::new(), blocklist: HashMap::new(),
hashwd: HashMap::new(), hashwd: HashMap::new(),
reloadinterval: 5,
}; };
print!("Loading config ... "); print!("Loading config ... ");
ctx.load(&inotify).await.unwrap(); ctx.load().await.unwrap();
ctx ctx
} }
@ -108,17 +103,17 @@ impl Context {
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: Discovery = match req.json().await { let data: Discovery = match req.json().await {
Ok(o) => o, Ok(res) => res,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
Ok(data) Ok(data)
} }
pub async fn load(&mut self, inotify: &Inotify) -> Result<(), Box<dyn std::error::Error>> { pub async fn load(&mut self) -> Result<(), Box<dyn std::error::Error>> {
if cfg!(test) { if cfg!(test) {
return Ok(()); return Ok(());
} }
@ -132,36 +127,17 @@ impl Context {
} }
break; break;
} }
Err(e) => { Err(err) => {
println!("error loading config: {e}, retrying in {CONFIG_RETRY_INTERVAL}s"); println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs");
last_in_err = true; last_in_err = true;
sleep_s(CONFIG_RETRY_INTERVAL).await; sleep_s(CONFIG_RETRY).await;
}
};
}
let mut last_in_err = false;
loop {
let res = self.discovery().await;
match res {
Ok(o) => {
self.discovery = o;
if last_in_err {
println!("loaded discovery");
}
break;
}
Err(e) => {
println!("error loading disvoery: {e}, retrying in {CONFIG_RETRY_INTERVAL}s");
last_in_err = true;
sleep_s(CONFIG_RETRY_INTERVAL).await;
} }
}; };
} }
if last_in_err { if last_in_err {
println!("creating sas"); println!("creating sas");
} }
self.create_sas(&inotify).await?; self.create_sas().await?;
if last_in_err { if last_in_err {
println!("created sas"); println!("created sas");
} }
@ -169,21 +145,21 @@ impl Context {
} }
#[cfg(test)] #[cfg(test)]
pub async fn get_blocklist_pending(&self) -> Vec<BlockIpData> { pub async fn get_blocklist_pending(&self) -> Vec<IpData> {
let mut res: Vec<BlockIpData> = vec![]; let mut res: Vec<IpData> = vec![];
for (_, ipblock) in self.blocklist.iter() { for (_, v) in self.blocklist.iter() {
res.push(ipblock.clone()); res.push(v.ipdata.clone());
} }
res res
} }
pub async fn get_blocklist_toblock(&self, all: bool) -> Vec<BlockIpData> { pub async fn get_blocklist_toblock(&mut self) -> Vec<IpData> {
let mut res: Vec<BlockIpData> = vec![]; let mut res: Vec<IpData> = vec![];
for (_, ipblock) in self.blocklist.iter() { for (_, block) in self.blocklist.iter_mut() {
match self.cfg.sets.get(&ipblock.ipdata.src) { match self.cfg.sets.get(&block.ipdata.src) {
Some(set) => { Some(set) => {
if ipblock.tryfail >= set.tryfail && (!ipblock.blocked || all) { if block.tryfail >= set.tryfail {
res.push(ipblock.clone()); res.push(block.ipdata.clone());
} }
} }
None => {} None => {}
@ -193,26 +169,21 @@ impl Context {
} }
pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> { pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> {
match &ipevent.ipdata { match self.cfg.sets.get(&ipevent.ipdata.src) {
Some(ipdata) => match self.cfg.sets.get(&ipdata.src) {
Some(set) => { Some(set) => {
let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str()) let starttime = DateTime::parse_from_rfc3339(ipevent.ipdata.date.as_str())
.unwrap() .unwrap()
.with_timezone(&chrono::Local); .with_timezone(&chrono::Local);
let blocktime = set.blocktime; let blocktime = set.blocktime;
let blocked = false;
let handle = u64::MIN;
if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname { if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
let block = let block = self
self.blocklist .blocklist
.entry(ipdata.ip.to_string()) .entry(ipevent.ipdata.ip.to_string())
.or_insert(BlockIpData { .or_insert(BlockIpData {
ipdata: ipdata.clone(), ipdata: ipevent.ipdata.clone(),
tryfail: 0, tryfail: 0,
starttime, starttime,
blocktime, blocktime,
blocked,
handle,
}); });
block.tryfail += 1; block.tryfail += 1;
block.blocktime = blocktime; block.blocktime = blocktime;
@ -221,26 +192,22 @@ impl Context {
} }
} else { } else {
self.blocklist self.blocklist
.entry(ipdata.ip.to_string()) .entry(ipevent.ipdata.ip.to_string())
.or_insert(BlockIpData { .or_insert(BlockIpData {
ipdata: ipdata.clone(), ipdata: ipevent.ipdata.clone(),
tryfail: set.tryfail, tryfail: set.tryfail,
starttime, starttime,
blocktime, blocktime,
blocked,
handle,
}); });
} }
} }
None => {} None => {}
},
None => {}
} }
None None
} }
pub async fn gc_blocklist(&mut self) -> Vec<BlockIpData> { pub async fn gc_blocklist(&mut self) -> Vec<IpData> {
let mut removed: Vec<BlockIpData> = vec![]; let mut removed: Vec<IpData> = vec![];
let now: DateTime<Local> = Local::now().trunc_subsecs(0); let now: DateTime<Local> = Local::now().trunc_subsecs(0);
// nightly, future use // nightly, future use
// let drained: HashMap<String,IpData> = ctx.blocklist.drain_filter(|k,v| v.parse_date() < mindate) // let drained: HashMap<String,IpData> = ctx.blocklist.drain_filter(|k,v| v.parse_date() < mindate)
@ -255,23 +222,21 @@ impl Context {
let mindate = now - Duration::minutes(blocked.blocktime); let mindate = now - Duration::minutes(blocked.blocktime);
if blocked.starttime < mindate { if blocked.starttime < mindate {
self.blocklist.remove(&ip.clone()).unwrap(); self.blocklist.remove(&ip.clone()).unwrap();
removed.push(blocked.clone()); removed.push(blocked.ipdata.clone());
} }
} }
removed removed
} }
pub async fn create_sas( pub async fn create_sas(&mut self) -> Result<(), Box<dyn std::error::Error>> {
&mut self,
inotify: &Inotify,
) -> Result<(), Box<dyn std::error::Error>> {
for (src, set) in self.cfg.sets.iter() { for (src, set) in self.cfg.sets.iter() {
let p = Path::new(set.path.as_str()); let p = Path::new(set.path.as_str());
if p.is_dir() { if p.is_dir() {
let wd = match self.hashwd.get(&set.path.to_string()) { let wd = match self.hashwd.get(&set.path.to_string()) {
Some(wd) => *wd, Some(wd) => *wd,
None => { None => {
let res = inotify let res = self
.instance
.add_watch(set.path.as_str(), AddWatchFlags::IN_MODIFY) .add_watch(set.path.as_str(), AddWatchFlags::IN_MODIFY)
.unwrap(); .unwrap();
self.hashwd.insert(set.path.to_string(), res); self.hashwd.insert(set.path.to_string(), res);
@ -317,7 +282,7 @@ impl Context {
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Config { pub struct Config {
pub sets: HashMap<String, SetCfg>, pub sets: HashMap<String, Set>,
#[serde(skip_serializing)] #[serde(skip_serializing)]
pub trustnets: Vec<String>, pub trustnets: Vec<String>,
pub ws: HashMap<String, WebSocketCfg>, pub ws: HashMap<String, WebSocketCfg>,
@ -329,7 +294,7 @@ impl Config {
Self { Self {
sets: HashMap::from([ sets: HashMap::from([
("smtp".to_string(), ("smtp".to_string(),
SetCfg { Set {
src: "smtp".to_string(), src: "smtp".to_string(),
filename: "mail.log".to_string(), filename: "mail.log".to_string(),
regex: "(SASL LOGIN authentication failed)".to_string(), regex: "(SASL LOGIN authentication failed)".to_string(),
@ -338,7 +303,7 @@ impl Config {
tryfail: 5, tryfail: 5,
}), }),
("ssh".to_string(), ("ssh".to_string(),
SetCfg { Set {
src: "ssh".to_string(), src: "ssh".to_string(),
filename: "auth.log".to_string(), filename: "auth.log".to_string(),
regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(), regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(),
@ -347,7 +312,7 @@ impl Config {
tryfail: 5, tryfail: 5,
},), },),
("http".to_string(), ("http".to_string(),
SetCfg { Set {
src: "http".to_string(), src: "http".to_string(),
filename: "".to_string(), filename: "".to_string(),
regex: "(anonymousfox.co)".to_string(), regex: "(anonymousfox.co)".to_string(),
@ -356,7 +321,7 @@ impl Config {
tryfail: 5, tryfail: 5,
},), },),
("openvpn".to_string(), ("openvpn".to_string(),
SetCfg { Set {
src: "openvpn".to_string(), src: "openvpn".to_string(),
filename: "status".to_string(), filename: "status".to_string(),
regex: "(UNDEF)".to_string(), regex: "(UNDEF)".to_string(),
@ -385,61 +350,97 @@ impl Config {
} }
pub async fn load(&mut self, server: &String) -> Result<(), ReqError> { pub async fn load(&mut self, server: &String) -> Result<(), ReqError> {
self.get_config(server).await?; self.get_global_config(server).await?;
self.get_trustnets(server).await?;
self.get_sets(server).await?;
self.get_ws_config(server).await?;
Ok(()) Ok(())
} }
async fn get_config(&mut self, server: &String) -> Result<(), ReqError> { async fn get_global_config(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> =
httpclient().get(format!("{server}/config")).send().await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: HashMap<String, GlobalConfig> = match req.json::<Vec<GlobalConfig>>().await {
Ok(res) => {
let mut out: HashMap<String, GlobalConfig> = HashMap::new();
res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.key.to_string(), x);
});
out
}
Err(err) => return Err(err),
};
let key = "".to_string();
self.api = data
.get(&key.to_string())
.unwrap_or(&GlobalConfig {
key: "api".to_string(),
value: "127.0.0.1:8060".to_string(),
})
.value
.clone();
Ok(())
}
async fn get_trustnets(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = httpclient() let resp: Result<Response, ReqError> = httpclient()
.get(format!("{server}/config?v=2")) .get(format!("{server}/config/trustlist"))
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: GlobalConfigV2 = match req.json::<GlobalConfigV2>().await { let data: Vec<String> = match req.json::<Vec<String>>().await {
Ok(o) => o, Ok(res) => res,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
self.trustnets = data;
Ok(())
}
for d in data.sets { async fn get_sets(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = httpclient()
.get(format!("{server}/config/sets"))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<Set> = match req.json::<Vec<Set>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
for d in data {
self.sets.insert(d.src.clone(), d); self.sets.insert(d.src.clone(), d);
} }
self.trustnets = data.trustlists;
data.ws.into_iter().map(|x| x).for_each(|x| {
self.ws.insert(x.t.to_string(), x);
});
self.api = data
.cfg
.get(&"api".to_string())
.unwrap_or(&self.api)
.clone();
Ok(()) Ok(())
} }
pub async fn _get_last(server: &String) -> Result<Vec<IpData>, ReqError> { async fn get_ws_config(&mut self, server: &String) -> Result<(), ReqError> {
let resp = httpclient() let resp: Result<Response, ReqError> =
.get(format!("{server}/ips/last")) httpclient().get(format!("{server}/config/ws")).send().await;
.query(&[("interval", "3 hours")])
.send()
.await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: HashMap<String, WebSocketCfg> = match req.json::<Vec<WebSocketCfg>>().await {
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await { Ok(res) => {
Ok(o) => o, let mut out: HashMap<String, WebSocketCfg> = HashMap::new();
Err(e) => return Err(e), res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.t.to_string(), x);
});
out
}
Err(err) => return Err(err),
}; };
self.ws = data;
Ok(data) Ok(())
} }
pub fn build_trustnets(&self) -> Vec<IpNet> { pub fn build_trustnets(&self) -> Vec<IpNet> {
@ -447,8 +448,8 @@ impl Config {
for trustnet in &self.trustnets { for trustnet in &self.trustnets {
match trustnet.parse() { match trustnet.parse() {
Ok(net) => trustnets.push(net), Ok(net) => trustnets.push(net),
Err(e) => { Err(err) => {
println!("error parsing {trustnet}, error: {e}"); println!("error parsing {trustnet}, error: {err}");
} }
}; };
} }
@ -460,7 +461,12 @@ impl Config {
msgtype: String::from("bootstrap"), msgtype: String::from("bootstrap"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: gethostname(true), hostname: gethostname(true),
ipdata: None, ipdata: IpData {
ip: "".to_string(),
src: "".to_string(),
date: "".to_string(),
hostname: "".to_string(),
},
} }
} }
} }
@ -481,15 +487,13 @@ pub fn httpclient() -> Client {
} }
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct GlobalConfigV2 { pub struct GlobalConfig {
pub cfg: HashMap<String, String>, pub key: String,
pub sets: Vec<SetCfg>, pub value: String,
pub trustlists: Vec<String>,
pub ws: Vec<WebSocketCfg>,
} }
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct SetCfg { pub struct Set {
pub src: String, pub src: String,
pub filename: String, pub filename: String,
pub regex: String, pub regex: String,
@ -518,13 +522,13 @@ pub struct URL {
pub path: String, pub path: String,
} }
impl PartialEq for SetCfg { impl PartialEq for Set {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.src == other.src self.src == other.src
} }
} }
impl Hash for SetCfg { impl Hash for Set {
fn hash<H: Hasher>(&self, state: &mut H) { fn hash<H: Hasher>(&self, state: &mut H) {
self.src.hash(state); self.src.hash(state);
} }
@ -534,12 +538,10 @@ impl Hash for SetCfg {
mod test { mod test {
use super::*; use super::*;
use crate::ip::*; use crate::ip::*;
use nix::sys::inotify::InitFlags;
use Context; use Context;
pub async fn prepare_test_data() -> Context { pub async fn prepare_test_data() -> Context {
let inotify = Inotify::init(InitFlags::empty()).unwrap(); let mut ctx = Context::new().await;
let mut ctx = Context::new(&inotify).await;
let now: DateTime<Local> = Local::now().trunc_subsecs(0); let now: DateTime<Local> = Local::now().trunc_subsecs(0);
ctx.blocklist = HashMap::new(); ctx.blocklist = HashMap::new();
@ -548,13 +550,12 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4,
ip: "1.1.1.1".to_string(), ip: "1.1.1.1".to_string(),
hostname: "test1".to_string(), hostname: "test1".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "ssh".to_string(), src: "ssh".to_string(),
}), },
}) })
.await; .await;
} }
@ -564,13 +565,12 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4,
ip: "1.1.1.2".to_string(), ip: "1.1.1.2".to_string(),
hostname: "test2".to_string(), hostname: "test2".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
} }
@ -579,13 +579,12 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4,
ip: "1.1.1.3".to_string(), ip: "1.1.1.3".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
@ -593,13 +592,12 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4,
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
}) })
.await; .await;
@ -607,33 +605,19 @@ mod test {
msgtype: String::from("add"), msgtype: String::from("add"),
mode: String::from("ws"), mode: String::from("ws"),
hostname: String::from("localhost"), hostname: String::from("localhost"),
ipdata: Some(IpData { ipdata: IpData {
t: 4,
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), },
})
.await;
ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 6,
ip: "2a00:1450:4007:805::2003".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
}) })
.await; .await;
let ip1 = ctx.blocklist.get_mut(&"1.1.1.1".to_string()).unwrap(); let mut ip1 = ctx.blocklist.get_mut(&"1.1.1.1".to_string()).unwrap();
ip1.starttime = DateTime::from(now) - Duration::minutes(61); ip1.starttime = DateTime::from(now) - Duration::minutes(61);
let ip2 = ctx.blocklist.get_mut(&"1.1.1.2".to_string()).unwrap(); let mut ip2 = ctx.blocklist.get_mut(&"1.1.1.2".to_string()).unwrap();
ip2.starttime = DateTime::from(now) - Duration::minutes(62); ip2.starttime = DateTime::from(now) - Duration::minutes(62);
ctx ctx
} }
@ -643,14 +627,8 @@ mod test {
let ctx = prepare_test_data().await; let ctx = prepare_test_data().await;
let pending = ctx.get_blocklist_pending().await; let pending = ctx.get_blocklist_pending().await;
assert_eq!(pending.len(), 5); assert_eq!(pending.len(), 4);
let ips = [ let ips = ["1.1.1.1", "1.1.1.2", "1.1.1.3", "1.1.1.4"];
"1.1.1.1",
"1.1.1.2",
"1.1.1.3",
"1.1.1.4",
"2a00:1450:4007:805::2003",
];
for i in ips { for i in ips {
let ip = ctx let ip = ctx
.blocklist .blocklist
@ -667,8 +645,8 @@ mod test {
pub async fn test_blocklist_toblock() { pub async fn test_blocklist_toblock() {
let mut ctx = prepare_test_data().await; let mut ctx = prepare_test_data().await;
ctx.gc_blocklist().await; ctx.gc_blocklist().await;
let toblock = ctx.get_blocklist_toblock(false).await; let toblock = ctx.get_blocklist_toblock().await;
assert_eq!(toblock.len(), 3); assert_eq!(toblock.len(), 2);
} }
#[tokio::test] #[tokio::test]

280
src/fw.rs
View File

@ -1,208 +1,104 @@
use crate::{config::Context, ip::BlockIpData, ipblc::PKG_NAME}; use crate::ip::IpData;
use crate::ipblc::PKG_NAME;
use std::{ use nftnl::{nft_expr, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table};
io::Error, use std::{ffi::CString, io::Error, net::Ipv4Addr};
net::{IpAddr, Ipv4Addr, Ipv6Addr},
sync::Arc,
};
use tokio::sync::RwLock; pub fn fwinit() -> (Batch, Table) {
use rustables::{expr::*, *};
pub enum FwTableType {
IPv4,
IPv6,
}
#[allow(dead_code)]
pub enum FwAction {
Add,
Delete,
}
macro_rules! initrules {
($batch:expr, $table:expr, $chain:ident, $reset:expr) => {
$chain.set_hook(Hook::new(HookClass::In, 1));
$batch.add(&$chain, MsgType::Add);
if $reset {
$batch.add(&Rule::new(&$chain).unwrap(), MsgType::Del);
}
};
}
macro_rules! makerules {
($ipdata:ident, $chain:ident, $batch:ident, $t:ty, $ip_t:ident,$action:ty) => {
let ip = $ipdata.ipdata.ip.parse::<$t>().unwrap();
Rule::new(&$chain)
.unwrap()
.saddr(ip.into())
.drop()
.add_to_batch(&mut $batch);
};
}
pub fn fwglobalinit(t: FwTableType, reset: bool) -> (Batch, Chain) {
let table_name: String;
let table: Table;
let mut chain: Chain;
match t {
FwTableType::IPv4 => {
table_name = format!("{PKG_NAME}4");
table = Table::new(ProtocolFamily::Ipv4).with_name(table_name);
chain = Chain::new(&table)
.with_policy(ChainPolicy::Accept)
.with_name(PKG_NAME);
}
FwTableType::IPv6 => {
table_name = format!("{PKG_NAME}6");
table = Table::new(ProtocolFamily::Ipv6).with_name(table_name);
chain = Chain::new(&table)
.with_policy(ChainPolicy::Accept)
.with_name(PKG_NAME);
}
}
let mut batch = Batch::new(); let mut batch = Batch::new();
batch.add(&table, MsgType::Add); let table = Table::new(&CString::new(PKG_NAME).unwrap(), ProtoFamily::Ipv4);
initrules!(batch, table, chain, reset);
(batch, chain) batch.add(&table, nftnl::MsgType::Add);
batch.add(&table, nftnl::MsgType::Del);
batch.add(&table, nftnl::MsgType::Add);
(batch, table)
} }
pub fn fwblock<'a>(ip_add: &BlockIpData) -> std::result::Result<&String, error::QueryError> { pub fn fwblock(
let (mut batch4, chain4) = fwglobalinit(FwTableType::IPv4, false); ips_add: &Vec<IpData>,
let (mut batch6, chain6) = fwglobalinit(FwTableType::IPv6, false);
match ip_add.ipdata.t {
4 => {
makerules!(ip_add, chain4, batch4, Ipv4Addr, ipv4, FwAction::Add);
match batch4.send() {
Ok(_) => {}
Err(e) => {
println!("block not ok {e} {ip_add:?}")
}
}
}
6 => {
makerules!(ip_add, chain6, batch6, Ipv6Addr, ipv6, FwAction::Add);
match batch6.send() {
Ok(_) => {}
Err(e) => {
println!("block not ok {e} {ip_add:?}")
}
}
}
_ => {}
}
Ok(&ip_add.ipdata.ip)
}
pub fn fwunblock<'a>(ip_del: &BlockIpData) -> std::result::Result<&String, error::QueryError> {
let (mut batch4, chain4) = fwglobalinit(FwTableType::IPv4, false);
let (mut batch6, chain6) = fwglobalinit(FwTableType::IPv6, false);
match ip_del.ipdata.t {
4 => {
let r = Rule::new(&chain4).unwrap().with_handle(ip_del.handle);
batch4.add(&r, MsgType::Del);
match batch4.send() {
Ok(_) => {}
Err(e) => {
println!("delete not ok {e} {ip_del:?}")
}
}
}
6 => {
let r = Rule::new(&chain6).unwrap().with_handle(ip_del.handle);
batch6.add(&r, MsgType::Del);
match batch6.send() {
Ok(_) => {}
Err(e) => {
println!("delete not ok {e} {ip_del:?}")
}
}
}
_ => {}
}
Ok(&ip_del.ipdata.ip)
}
pub async fn get_current_rules(
ctx: &Arc<RwLock<Context>>,
ret: &mut Vec<String>, ret: &mut Vec<String>,
fwlen: &mut usize, fwlen: &mut usize,
) -> Result<(), Error> { ) -> std::result::Result<(), Error> {
let mut ips_all_count = 0; // convert chain
let tables = vec![format!("{PKG_NAME}4"), format!("{PKG_NAME}6")]; let ips_add = convert(ips_add);
for table_name in tables { let (mut batch, table) = fwinit();
let get_table = || -> Result<Option<Table>, Error> {
let tables = list_tables().unwrap(); // build chain
for table in tables { let mut chain = Chain::new(&CString::new(PKG_NAME).unwrap(), &table);
if let Some(name) = table.get_name() { chain.set_hook(nftnl::Hook::In, 1);
if *name == table_name { chain.set_policy(nftnl::Policy::Accept);
return Ok(Some(table));
} // add chain
} batch.add(&chain, nftnl::MsgType::Add);
let rule = Rule::new(&chain);
batch.add(&rule, nftnl::MsgType::Del);
let mut rule = Rule::new(&chain);
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict accept));
batch.add(&rule, nftnl::MsgType::Add);
// build and add rules
for ip in ips_add.clone() {
let mut rule = Rule::new(&chain);
rule.add_expr(&nft_expr!(payload ipv4 saddr));
rule.add_expr(&nft_expr!(cmp == ip));
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 10u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict drop));
batch.add(&rule, nftnl::MsgType::Add);
} }
Ok(None) // validate and send batch
}; let finalized_batch = batch.finalize();
send_and_process(&finalized_batch)?;
let get_chain = |table: &Table| -> Result<Option<Chain>, Error> { if fwlen != &mut ips_add.len() {
let chains = list_chains_for_table(table).unwrap(); ret.push(format!("{length} ip in firewall", length = ips_add.len()));
for chain in chains {
if let Some(name) = chain.get_name() {
if *name == "ipblc" {
return Ok(Some(chain));
} }
} *fwlen = ips_add.len();
}
Ok(None)
};
let table = get_table()?.expect("no table?");
let chain = get_chain(&table)?.expect("no chain?");
let mut ctx = { ctx.write().await };
let rules = list_rules_for_chain(&chain).unwrap().clone();
for (ip, c) in ctx.blocklist.iter_mut() {
let ip_parsed: IpAddr = ip.parse().unwrap();
let cmprule = Rule::new(&chain).unwrap().saddr(ip_parsed).drop();
let mut gexpr = RawExpression::default();
for e in cmprule.get_expressions().unwrap().iter() {
if let Some(ExpressionVariant::Cmp(_ip)) = e.get_data() {
gexpr = e.clone();
}
}
for rule in rules.iter() {
for expr in rule.get_expressions().unwrap().iter() {
if let Some(expr::ExpressionVariant::Cmp(_)) = expr.get_data() {
if gexpr == expr.clone() {
ips_all_count += 1;
c.handle = *rule.get_handle().unwrap();
}
}
}
}
}
}
if *fwlen != ips_all_count {
ret.push(format!("{length} ip in firewall", length = ips_all_count));
}
*fwlen = ips_all_count;
Ok(()) Ok(())
} }
#[allow(dead_code)] fn send_and_process(batch: &FinalizedBatch) -> std::result::Result<(), Error> {
fn fw_rules_count() -> i64 { let seq: u32 = 2;
0 let socket = mnl::Socket::new(mnl::Bus::Netfilter)?;
socket.send_all(batch)?;
let mut buffer = vec![0; nftnl::nft_nlmsg_maxsize() as usize];
while let Some(message) = socket_recv(&socket, &mut buffer[..])? {
match mnl::cb_run(message, seq, socket.portid())? {
mnl::CbResult::Stop => {
break;
}
mnl::CbResult::Ok => (),
}
}
Ok(())
}
fn socket_recv<'a>(
socket: &mnl::Socket,
buf: &'a mut [u8],
) -> std::result::Result<Option<&'a [u8]>, Error> {
let ret = socket.recv(buf)?;
if ret > 0 {
Ok(Some(&buf[..ret]))
} else {
Ok(None)
}
}
fn convert(input: &Vec<IpData>) -> Vec<Ipv4Addr> {
let mut output: Vec<Ipv4Addr> = vec![];
for val in input {
output.push(val.ip.parse::<Ipv4Addr>().unwrap());
}
output
} }

147
src/ip.rs
View File

@ -1,18 +1,16 @@
use crate::config::httpclient;
use crate::utils::gethostname; use crate::utils::gethostname;
use std::{
cmp::Ordering,
fmt::{Display, Formatter},
io::{BufRead, BufReader, Read},
net::IpAddr,
};
use chrono::offset::LocalResult;
use chrono::prelude::*; use chrono::prelude::*;
use ipnet::IpNet; use ipnet::IpNet;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::Regex; use regex::Regex;
use reqwest::Error as ReqError;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::io::{BufRead, BufReader, Read};
use std::net::IpAddr;
lazy_static! { lazy_static! {
static ref R_IPV4: Regex = Regex::new(include_str!("regexps/ipv4.txt")).unwrap(); static ref R_IPV4: Regex = Regex::new(include_str!("regexps/ipv4.txt")).unwrap();
@ -25,27 +23,15 @@ pub struct IpEvent {
pub msgtype: String, pub msgtype: String,
pub mode: String, pub mode: String,
pub hostname: String, pub hostname: String,
pub ipdata: Option<IpData>, pub ipdata: IpData,
} }
#[macro_export] #[derive(Clone, Debug, Serialize, Deserialize, Eq)]
macro_rules! ipevent { pub struct IpData {
($msgtype:expr,$mode:expr,$hostname:expr,$ipdata:expr) => { pub ip: String,
IpEvent { pub src: String,
msgtype: String::from($msgtype), pub date: String,
mode: String::from($mode), pub hostname: String,
hostname: $hostname,
ipdata: $ipdata,
}
};
($msgtype:expr,$mode:expr,$hostname:expr) => {
IpEvent {
msgtype: String::from($msgtype),
mode: String::from($mode),
hostname: $hostname,
ipdata: None,
}
};
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -54,30 +40,6 @@ pub struct BlockIpData {
pub tryfail: i64, pub tryfail: i64,
pub blocktime: i64, pub blocktime: i64,
pub starttime: DateTime<Local>, pub starttime: DateTime<Local>,
pub blocked: bool,
pub handle: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq)]
pub struct IpData {
pub t: isize,
pub ip: String,
pub src: String,
pub date: String,
pub hostname: String,
}
#[macro_export]
macro_rules! ipdata {
($t:expr,$ip:expr,$src:expr,$date:expr,$hostname:expr) => {
IpData {
t: $t.clone(),
ip: $ip.clone(),
src: $src.clone(),
date: $date.clone(),
hostname: $hostname.clone(),
}
};
} }
impl PartialEq for IpData { impl PartialEq for IpData {
@ -115,7 +77,7 @@ impl Display for IpData {
} }
pub fn filter( pub fn filter(
reader: Box<dyn Read>, lines: Box<dyn Read>,
iplist: &mut Vec<IpData>, iplist: &mut Vec<IpData>,
trustnets: &Vec<IpNet>, trustnets: &Vec<IpNet>,
regex: &Regex, regex: &Regex,
@ -124,23 +86,19 @@ pub fn filter(
) -> isize { ) -> isize {
let mut ips = 0; let mut ips = 0;
let hostname = gethostname(true); let hostname = gethostname(true);
let lines = BufReader::new(reader).lines(); for line in BufReader::new(lines).lines() {
for line in lines.into_iter() {
if let Ok(l) = line { if let Ok(l) = line {
if regex.is_match(l.as_str()) { if regex.is_match(l.as_str()) {
let s_ipaddr: String; let s_ipaddr: String;
let t: isize;
match R_IPV4.captures(l.as_str()) { match R_IPV4.captures(l.as_str()) {
Some(sv4) => { Some(sv4) => {
s_ipaddr = sv4.get(0).unwrap().as_str().to_string(); s_ipaddr = sv4.get(0).unwrap().as_str().to_string();
t = 4;
} }
None => { None => {
match R_IPV6.captures(l.as_str()) { match R_IPV6.captures(l.as_str()) {
Some(sv6) => { Some(sv6) => {
s_ipaddr = sv6.get(0).unwrap().as_str().to_string(); s_ipaddr = sv6.get(0).unwrap().as_str().to_string();
t = 6;
} }
None => { None => {
continue; continue;
@ -149,14 +107,6 @@ pub fn filter(
} }
}; };
let ipaddr: IpAddr = match s_ipaddr.parse() {
Ok(ip) => ip,
Err(err) => {
println!("unparseable IP: {err} {s_ipaddr}");
continue;
}
};
let s_date: DateTime<Local>; let s_date: DateTime<Local>;
match R_DATE.captures(l.as_str()) { match R_DATE.captures(l.as_str()) {
Some(sdt) => { Some(sdt) => {
@ -170,8 +120,21 @@ pub fn filter(
} }
}; };
let ipaddr: IpAddr = match s_ipaddr.parse() {
Ok(ip) => ip,
Err(err) => {
println!("unparseable IP: {err} {s_ipaddr}");
continue;
}
};
if !is_trusted(&ipaddr, &trustnets) { if !is_trusted(&ipaddr, &trustnets) {
iplist.push(ipdata!(t, s_ipaddr, src, s_date.to_rfc3339(), hostname)); iplist.push(IpData {
ip: s_ipaddr,
src: src.to_owned(),
date: s_date.to_rfc3339().to_owned(),
hostname: hostname.to_owned(),
});
ips += 1; ips += 1;
}; };
} }
@ -181,24 +144,28 @@ pub fn filter(
} }
fn parse_date(input: regex::Captures) -> DateTime<Local> { fn parse_date(input: regex::Captures) -> DateTime<Local> {
let mut ymd: Vec<u32> = vec![]; let mut ymd: Vec<u64> = vec![];
let mut hms: Vec<u32> = vec![]; let mut hms: Vec<u64> = vec![];
let ymd_range = 2..5;
let hms_range = 5..8;
for cap in ymd_range { let (daterange, hourrange) = (2..5, 5..8);
ymd.push(input.get(cap).unwrap().as_str().parse::<u32>().unwrap());
for i in daterange {
ymd.push(input.get(i).unwrap().as_str().parse::<u64>().unwrap());
} }
for cap in hms_range { for i in hourrange {
hms.push(input.get(cap).unwrap().as_str().parse::<u32>().unwrap()); hms.push(input.get(i).unwrap().as_str().parse::<u64>().unwrap());
} }
let date: DateTime<Local> = let date = Local
match Local.with_ymd_and_hms(ymd[0] as i32, ymd[1], ymd[2], hms[0], hms[1], hms[2]) { .with_ymd_and_hms(
LocalResult::Single(s) => s, ymd[0] as i32,
LocalResult::Ambiguous(a, _b) => a, ymd[1] as u32,
LocalResult::None => Local::now().trunc_subsecs(0), ymd[2] as u32,
}; hms[0] as u32,
hms[1] as u32,
hms[2] as u32,
)
.unwrap();
date date
} }
@ -210,3 +177,23 @@ fn is_trusted(ip: &IpAddr, trustnets: &Vec<IpNet>) -> bool {
} }
false false
} }
pub async fn _get_last(server: &String) -> Result<Vec<IpData>, ReqError> {
let resp = httpclient()
.get(format!("{server}/ips/last"))
.query(&[("interval", "3 hours")])
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
Ok(data)
}

View File

@ -1,48 +1,35 @@
use crate::config::{Context, GIT_VERSION}; use crate::config::{Context, GIT_VERSION};
use crate::fw::*; use crate::fw::{fwblock, fwinit};
use crate::ip::{filter, IpData, IpEvent}; use crate::ip::{filter, IpData, IpEvent};
use crate::ipevent;
use crate::monitoring::apiserver; use crate::monitoring::apiserver;
use crate::utils::{gethostname, read_lines, sleep_s}; use crate::utils::{gethostname, read_lines, sleep_ms};
use crate::webservice::send_to_ipbl_api; use crate::webservice::send_to_ipbl_api;
use crate::websocket::{send_to_ipbl_websocket, websocketpubsub, websocketreqrep}; use crate::websocket::{send_to_ipbl_websocket, websocketpubsub, websocketreqrep};
use std::{collections::HashMap, sync::Arc};
use chrono::prelude::*; use chrono::prelude::*;
use chrono::prelude::{DateTime, Local}; use chrono::prelude::{DateTime, Local};
use chrono::Duration; use chrono::Duration;
use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent}; use nix::sys::inotify::InotifyEvent;
use sd_notify::*; use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock; use tokio::sync::RwLock;
pub const PKG_NAME: &str = env!("CARGO_PKG_NAME"); pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
const BL_CHAN_SIZE: usize = 32; const BL_CHAN_SIZE: usize = 32;
const WS_CHAN_SIZE: usize = 64; const WS_CHAN_SIZE: usize = 64;
const LOOPSLEEP: u64 = 500;
macro_rules! log_with_systemd {
($msg:expr) => {
println!("{}", $msg);
notify(false, &[NotifyState::Status(format!("{}", $msg).as_str())]).unwrap();
};
}
pub async fn run() { pub async fn run() {
let inotify = Inotify::init(InitFlags::empty()).unwrap(); let globalctx = Context::new().await;
let globalctx = Context::new(&inotify).await;
let ctxarc = Arc::new(RwLock::new(globalctx)); let ctxarc = Arc::new(RwLock::new(globalctx));
let (batch4, _) = fwglobalinit(FwTableType::IPv4, true);
let (batch6, _) = fwglobalinit(FwTableType::IPv6, true);
batch4.send().unwrap();
batch6.send().unwrap();
let mut fwlen: usize = 0; let mut fwlen: usize = 0;
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0); let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
log_with_systemd!(format!("Launching {}, version {}", PKG_NAME, pkgversion)); println!("Launching {}, version {}", PKG_NAME, pkgversion);
fwinit();
let ctxapi = Arc::clone(&ctxarc); let ctxapi = Arc::clone(&ctxarc);
apiserver(&ctxapi).await.unwrap(); apiserver(&ctxapi).await.unwrap();
@ -60,9 +47,7 @@ pub async fn run() {
let mut wssocketrr = websocketreqrep(&ctxwsrr).await; let mut wssocketrr = websocketreqrep(&ctxwsrr).await;
// init file watcher // init file watcher
let inoarc = Arc::new(RwLock::new(inotify)); let mut blrx = watchfiles(&ctxarc).await;
let inoclone = Arc::clone(&inoarc);
let mut blrx = watchfiles(inoclone).await;
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
let ipeventclone = Arc::clone(&ipeventtxarc); let ipeventclone = Arc::clone(&ipeventtxarc);
@ -70,32 +55,31 @@ pub async fn run() {
compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await; compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await;
}); });
notify(false, &[NotifyState::Ready]).unwrap();
loop { loop {
let mut ret: Vec<String> = Vec::new(); let mut ret: Vec<String> = Vec::new();
let ctxclone = Arc::clone(&ctxarc);
let reloadinterval; let ctxclone = Arc::clone(&ctxarc);
{
let ctx = ctxclone.read().await;
reloadinterval = ctx.reloadinterval;
}
tokio::select! { tokio::select! {
ipevent = ipeventrx.recv() => { ipevent = ipeventrx.recv() => {
let received_ip = ipevent.unwrap(); let received_ip = ipevent.unwrap();
let (toblock,server) = { let (toblock,server);
let ctx = ctxclone.read().await; {
(ctx.get_blocklist_toblock(true).await,ctx.flags.server.clone()) let mut ctx = ctxclone.write().await;
}; toblock = ctx.get_blocklist_toblock().await;
server = ctx.flags.server.clone();
}
if received_ip.msgtype == "bootstrap".to_string() { if received_ip.msgtype == "bootstrap".to_string() {
for ip_to_send in toblock { for ip_to_send in toblock {
let ipe = ipevent!("init","ws",gethostname(true),Some(ip_to_send.ipdata)); let ipe = IpEvent{
msgtype: String::from("init"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: ip_to_send,
};
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await; wssocketrr = websocketreqrep(&ctxwsrr).await;
break; break;
} }
@ -104,112 +88,52 @@ pub async fn run() {
} }
// refresh context blocklist // refresh context blocklist
let filtered_ipevent = { let filtered_ipevent;
ctxarc.write().await.update_blocklist(&received_ip).await {
}; let mut ctx = ctxarc.write().await;
filtered_ipevent = ctx.update_blocklist(&received_ip).await;
}
// send ip list to api and ws sockets // send ip list to api and ws sockets
if let Some(ipevent) = filtered_ipevent { if let Some(ipevent) = filtered_ipevent {
if received_ip.msgtype != "init" { if received_ip.msgtype != "init" {
log_with_systemd!(format!("sending {} to api and ws", ipevent.ipdata.clone().unwrap().ip)); println!("sending {} to api and ws", ipevent.ipdata.ip);
let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata); let ipe = IpEvent{
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: ipevent.ipdata,
};
send_to_ipbl_api(&server.clone(), &ipe).await; send_to_ipbl_api(&server.clone(), &ipe).await;
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await { let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
wssocketrr.close(None).unwrap(); if !status {
wssocketrr = websocketreqrep(&ctxwsrr).await; wssocketrr = websocketreqrep(&ctxwsrr).await;
continue; continue;
} }
} }
} }
} }
_val = sleep_s(reloadinterval) => { _val = sleep_ms(LOOPSLEEP) => {}
let ipe = ipevent!("ping", "ws", gethostname(true));
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
}
}
}; };
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
let ipstounblock = { handle_fwblock(ctxclone, &mut ret, &mut fwlen).await;
let mut ctx = ctxclone.write().await;
ctx.gc_blocklist().await
};
let ipstoblock = {
let ctx = ctxclone.read().await;
ctx.get_blocklist_toblock(false).await
};
get_current_rules(&ctxarc, &mut ret, &mut fwlen)
.await
.unwrap();
for ip in ipstoblock {
match fwblock(&ip) {
Ok(ip) => {
let mut ctx = ctxclone.write().await;
if let Some(x) = ctx.blocklist.get_mut(ip) {
x.blocked = true;
}
}
Err(e) => {
println!("err: {e}, unable to push firewall rules, use super user")
}
};
}
for ip in ipstounblock {
if ip.blocked {
match fwunblock(&ip) {
Ok(_) => {}
Err(e) => {
println!("err: {e}, unable to push firewall rules, use super user")
}
};
}
}
// log lines // log lines
if ret.len() > 0 { if ret.len() > 0 {
let result = ret.join(", "); println!("{ret}", ret = ret.join(", "));
log_with_systemd!(format!("{result}"));
} }
let ctxclone = Arc::clone(&ctxarc); let ctxclone = Arc::clone(&ctxarc);
let inoclone = Arc::clone(&inoarc); handle_cfg_reload(&ctxclone, &mut last_cfg_reload).await;
handle_cfg_reload(&ctxclone, reloadinterval, &mut last_cfg_reload, inoclone).await;
} }
} }
async fn handle_cfg_reload( async fn handle_cfg_reload(ctxclone: &Arc<RwLock<Context>>, last_cfg_reload: &mut DateTime<Local>) {
ctxclone: &Arc<RwLock<Context>>,
reloadinterval: u64,
last_cfg_reload: &mut DateTime<Local>,
inoarc: Arc<RwLock<Inotify>>,
) {
let now_cfg_reload = Local::now().trunc_subsecs(0); let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(reloadinterval as i64) { if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(5) {
let inotify; let mut ctx = ctxclone.write().await;
loop { match ctx.load().await {
inotify = match inoarc.try_read() {
Ok(o) => o,
Err(e) => {
println!("{e}");
sleep_s(1).await;
continue;
}
};
break;
}
let mut ctx = match ctxclone.try_write() {
Ok(o) => o,
Err(e) => {
println!("{e}");
return;
}
};
match ctx.load(&inotify).await {
Ok(_) => { Ok(_) => {
*last_cfg_reload = Local::now().trunc_subsecs(0); *last_cfg_reload = Local::now().trunc_subsecs(0);
} }
@ -220,11 +144,36 @@ async fn handle_cfg_reload(
}; };
} }
async fn watchfiles(inoarc: Arc<RwLock<Inotify>>) -> Receiver<FileEvent> { async fn handle_fwblock(ctxclone: Arc<RwLock<Context>>, ret: &mut Vec<String>, fwlen: &mut usize) {
let toblock;
{
let mut ctx = ctxclone.write().await;
ctx.gc_blocklist().await;
toblock = ctx.get_blocklist_toblock().await;
// apply firewall blocking
match fwblock(&toblock, ret, fwlen) {
Ok(_) => {}
Err(err) => {
println!("Err: {err}, unable to push firewall rules, use super user")
}
};
}
}
async fn watchfiles(ctxarc: &Arc<RwLock<Context>>) -> Receiver<FileEvent> {
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE); let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);
let ctxclone = Arc::clone(ctxarc);
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
let events = inoarc.read().await.read_events().unwrap(); let events;
let instance;
{
let ctx = ctxclone.read().await;
instance = ctx.instance.clone();
}
events = instance.read_events().unwrap();
for inevent in events { for inevent in events {
let date: DateTime<Local> = Local::now().trunc_subsecs(0); let date: DateTime<Local> = Local::now().trunc_subsecs(0);
@ -237,12 +186,12 @@ async fn watchfiles(inoarc: Arc<RwLock<Inotify>>) -> Receiver<FileEvent> {
async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, bool) { async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, bool) {
let currentlen = match std::fs::metadata(&path.to_string()) { let currentlen = match std::fs::metadata(&path.to_string()) {
Ok(u) => u.len(), Ok(u) => u.len().clone(),
Err(_) => 0u64, Err(_) => 0u64,
}; };
let lastlen = match w.insert(path.to_string(), currentlen) { let lastlen = match w.insert(path.to_string(), currentlen) {
Some(u) => u, Some(u) => u,
None => currentlen, None => 0u64,
}; };
(lastlen, lastlen != currentlen) (lastlen, lastlen != currentlen)
} }
@ -257,16 +206,20 @@ async fn compare_files_changes(
let modfiles = inrx.recv().await.unwrap(); let modfiles = inrx.recv().await.unwrap();
let mut iplist: Vec<IpData> = vec![]; let mut iplist: Vec<IpData> = vec![];
let sas = { let sask;
let sas;
{
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
sas = ctx.clone().sas;
sask = sas.keys();
tnets = ctx.cfg.build_trustnets(); tnets = ctx.cfg.build_trustnets();
ctx.sas.clone() }
};
match modfiles.inevent.name { match modfiles.inevent.name {
Some(name) => { Some(name) => {
let filename = name.to_str().unwrap(); let filename = name.to_str().unwrap();
for (sak, sa) in sas.clone().iter_mut() { for sak in sask {
let sa = sas.get(sak).unwrap();
if modfiles.inevent.wd == sa.wd { if modfiles.inevent.wd == sa.wd {
let handle: String; let handle: String;
if sa.filename.as_str() == "" { if sa.filename.as_str() == "" {
@ -277,11 +230,13 @@ async fn compare_files_changes(
continue; continue;
} }
let (filesize, sizechanged) = { let (filesize, sizechanged);
{
let mut ctx = ctxarc.write().await; let mut ctx = ctxarc.write().await;
let sa = ctx.sas.get_mut(sak).unwrap(); let sa = ctx.sas.get_mut(sak).unwrap();
get_last_file_size(&mut sa.watchedfiles, &handle).await (filesize, sizechanged) =
}; get_last_file_size(&mut sa.watchedfiles, &handle).await;
}
if !sizechanged { if !sizechanged {
continue; continue;
@ -304,9 +259,14 @@ async fn compare_files_changes(
} }
} }
for ip in iplist { for ip in iplist {
let ipe = ipevent!("add", "file", gethostname(true), Some(ip)); let ipevent = IpEvent {
let ipetx = ipeventtx.read().await; msgtype: String::from("add"),
ipetx.send(ipe).await.unwrap(); hostname: gethostname(true),
mode: String::from("file"),
ipdata: ip,
};
let ipetx = ipeventtx.write().await;
ipetx.send(ipevent).await.unwrap();
} }
} }
None => {} None => {}

View File

@ -1,75 +1,57 @@
use crate::config::Context; use crate::config::Context;
use std::{io, sync::Arc};
use serde_json; use serde_json;
use std::io;
use std::sync::Arc;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener; use tokio::net::TcpSocket;
use tokio::sync::RwLock; use tokio::sync::RwLock;
pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> { pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let ctxarc = ctxarc.clone(); let ctxarc = ctxarc.clone();
let addr: String = { ctxarc.read().await.cfg.api.parse().unwrap() }; let addr;
let listener = match TcpListener::bind(addr).await { {
Ok(o) => o, let ctx = ctxarc.read().await;
Err(e) => { addr = ctx.cfg.api.parse().unwrap();
println!("error: {e}");
std::process::exit(1);
} }
};
let socket = TcpSocket::new_v4().unwrap();
socket.bind(addr).unwrap();
socket.set_reuseaddr(true).unwrap();
let listener = socket.listen(1024).unwrap();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
//apitx.send(String::from("")).await.unwrap();
match listener.accept().await { match listener.accept().await {
Ok((mut socket, _addr)) => { Ok((stream, _addr)) => {
let mut buf = vec![0; 1024]; //let mut buf = [0; 1024];
let data;
{
let ctx = ctxarc.read().await;
data = serde_json::to_string(&ctx.blocklist);
}
match socket.readable().await { match data {
Ok(_) => { Ok(dt) => {
match socket.try_read(&mut buf) { let (_reader, mut writer) = stream.into_split();
match writer.write_all(format!("{dt}").as_bytes()).await {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(err) => {
println!("error: {e}"); println!("{err}");
continue;
}
};
}
Err(e) => {
println!("error: {e}");
continue;
}
}
let msg = match String::from_utf8(buf.to_vec()) {
Ok(o) => o.trim_matches(char::from(0)).trim().to_string(),
Err(_) => "".to_string(),
};
let res = format_result(&ctxarc, msg.as_str()).await;
match socket.write_all(res.as_bytes()).await {
Ok(_) => {}
Err(e) => {
println!("error: {e}");
} }
} }
} }
Err(err) => { Err(err) => {
println!("error: {err}"); println!("unable to serialize data: {err}");
}
}
}
Err(err) => {
println!("couldn't get client: {}", err)
} }
} }
} }
}); });
Ok(()) Ok(())
} }
async fn format_result(ctxarc: &Arc<RwLock<Context>>, mode: &str) -> String {
let data;
let ctx = ctxarc.read().await;
match mode {
"cfg" => data = serde_json::to_string(&ctx.cfg).unwrap(),
"blocklist" => data = serde_json::to_string(&ctx.blocklist).unwrap(),
_ => data = serde_json::to_string(&ctx.blocklist).unwrap(),
};
data
}

View File

@ -1 +1 @@
(((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*)|(((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?) ((^\s*((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*$)|(^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$))

View File

@ -1,13 +1,21 @@
use std::{boxed::Box, fs::File, io::*}; use lazy_static::lazy_static;
use nix::unistd; use nix::unistd;
use regex::Regex;
use std::boxed::Box;
use std::fs::File;
use std::io::*;
use std::path::Path;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
lazy_static! {
static ref R_FILE_GZIP: Regex = Regex::new(r".*\.gz.*").unwrap();
}
pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> { pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
let mut file = match File::open(filename) { let mut file = match File::open(filename) {
Ok(o) => o, Ok(f) => f,
Err(e) => { Err(err) => {
println!("error: {e}"); println!("{err}");
return None; return None;
} }
}; };
@ -16,13 +24,21 @@ pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
Some(lines) Some(lines)
} }
pub async fn sleep_s(s: u64) { pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
sleep(Duration::from_secs(s)).await; // Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
} }
#[allow(dead_code)] pub async fn sleep_ms(ms: u64) {
pub async fn sleep_ms(m: u64) { sleep(Duration::from_millis(ms)).await;
sleep(Duration::from_millis(m)).await; }
pub async fn sleep_s(s: u64) {
sleep(Duration::from_secs(s)).await;
} }
pub fn gethostname(show_fqdn: bool) -> String { pub fn gethostname(show_fqdn: bool) -> String {
@ -37,3 +53,19 @@ pub fn gethostname(show_fqdn: bool) -> String {
} }
hostname[0].to_string() hostname[0].to_string()
} }
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}

View File

@ -11,12 +11,12 @@ pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
let mut try_req = 0; let mut try_req = 0;
let client = httpclient(); let client = httpclient();
loop { loop {
match push_ip(&client, &server, &ip.ipdata.clone().unwrap()).await { match push_ip(&client, &server, &ip.ipdata).await {
Ok(_) => { Ok(_) => {
break; break;
} }
Err(e) => { Err(err) => {
println!("error: {e}"); println!("{err}");
sleep_s(1).await; sleep_s(1).await;
if try_req == MAX_FAILED_API_RATE { if try_req == MAX_FAILED_API_RATE {
break; break;
@ -31,7 +31,6 @@ async fn push_ip(client: &Client, server: &str, ip: &IpData) -> Result<(), ReqEr
let mut data: Vec<IpData> = vec![]; let mut data: Vec<IpData> = vec![];
data.push(IpData { data.push(IpData {
t: ip.t,
ip: ip.ip.to_string(), ip: ip.ip.to_string(),
src: ip.src.to_string(), src: ip.src.to_string(),
date: ip.date.to_string(), date: ip.date.to_string(),
@ -57,7 +56,6 @@ async fn _push_ip_bulk(
for ip in ips { for ip in ips {
data.push(IpData { data.push(IpData {
t: ip.t,
ip: ip.ip.to_string(), ip: ip.ip.to_string(),
src: ip.src.to_string(), src: ip.src.to_string(),
date: ip.date.to_string(), date: ip.date.to_string(),

View File

@ -2,13 +2,10 @@ use crate::config::{Context, WebSocketCfg};
use crate::ip::IpEvent; use crate::ip::IpEvent;
use crate::utils::{gethostname, sleep_s}; use crate::utils::{gethostname, sleep_s};
use std::{
io::{self, Write},
net::TcpStream,
sync::Arc,
};
use serde_json::json; use serde_json::json;
use std::io::{self, Write};
use std::net::TcpStream;
use std::sync::Arc;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tungstenite::stream::*; use tungstenite::stream::*;
@ -17,13 +14,13 @@ use tungstenite::*;
pub async fn websocketreqrep( pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>, ctxarc: &Arc<RwLock<Context>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> { ) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event, wscfg); let (mut wssocketrr, bootstrap_event, cfg);
{ {
let ctx = ctxarc.read().await; let ctx = ctxarc.read().await;
bootstrap_event = ctx.cfg.bootstrap_event().clone(); bootstrap_event = ctx.cfg.bootstrap_event().clone();
wscfg = ctx.cfg.ws.get("reqrep").unwrap().clone(); cfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
} }
wssocketrr = websocketconnect(&wscfg, &gethostname(true)).await.unwrap(); wssocketrr = websocketconnect(&cfg, &gethostname(true)).await.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await; send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
return wssocketrr; return wssocketrr;
@ -44,30 +41,21 @@ pub async fn websocketpubsub(
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
let mut ws = websocket.write().await; let mut ws = websocket.write().await;
match ws.read() { match ws.read_message() {
Ok(msg) => { Ok(msg) => {
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) { let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
Ok(o) => o, Ok(o) => o,
Err(e) => { Err(_e) => {
println!("error in pubsub: {e:?}");
continue; continue;
} }
}; };
match tosend.ipdata.clone() { if tosend.ipdata.hostname != gethostname(true)
Some(o) => {
if o.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string() || tosend.msgtype == "init".to_string()
{ {
let txps = txpubsub.read().await; let txps = txpubsub.write().await;
txps.send(tosend).await.unwrap(); txps.send(tosend).await.unwrap();
} }
} }
None => {
let txps = txpubsub.read().await;
txps.send(tosend.clone()).await.unwrap();
}
}
}
Err(e) => { Err(e) => {
println!("error in pubsub: {e:?}"); println!("error in pubsub: {e:?}");
ws.close(None).unwrap(); ws.close(None).unwrap();
@ -102,7 +90,9 @@ pub async fn websocketconnect<'a>(
} }
println!("connected to {endpoint}"); println!("connected to {endpoint}");
let msg = json!({ "hostname": hostname }); let msg = json!({ "hostname": hostname });
socket.send(Message::Text(msg.to_string().into())).unwrap(); socket
.write_message(Message::Text(msg.to_string()))
.unwrap();
Ok(socket) Ok(socket)
} }
@ -113,30 +103,32 @@ pub async fn send_to_ipbl_websocket(
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap()); let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
if ws.can_write() { if ws.can_write() {
match ws.send(Message::Text(msg.into())) { match ws.write_message(Message::Text(msg)) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err send read: {e:?}"); println!("err send read: {e:?}");
return false; return handle_websocket_error(ws);
} }
}; };
} else { } else {
println!("can't write to socket"); return handle_websocket_error(ws);
return false;
}; };
if ws.can_read() { if ws.can_read() {
match ws.read() { match ws.read_message() {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("err send read: {e:?}"); println!("err send read: {e:?}");
return false; return handle_websocket_error(ws);
} }
}; };
} else { } else {
println!("can't read from socket"); return handle_websocket_error(ws);
return false;
}; };
true true
} }
fn handle_websocket_error(ws: &mut WebSocket<MaybeTlsStream<TcpStream>>) -> bool {
ws.close(None).unwrap();
return false;
}