initial version of ipblc after blparser renaming
All checks were successful
continuous-integration/drone/tag Build is passing
continuous-integration/drone Build is passing

This commit is contained in:
Paul 2022-05-27 13:59:17 +02:00
commit 85d85a4ae4
22 changed files with 2745 additions and 0 deletions

39
.drone.yml Normal file
View File

@ -0,0 +1,39 @@
---
kind: pipeline
type: docker
name: default
steps:
- name: test and build
image: rust:1
commands:
- apt-get update -y
- apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev
- cargo build --verbose --all
- cargo test --verbose --all
when:
event: push
- name: release
image: rust:1
commands:
- apt-get update -y
- apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev
- cargo build --release --verbose --all
- cd target/release
- tar -czvf ipblc-${DRONE_TAG}.tar.gz ipblc
when:
event: tag
- name: publish
image: plugins/gitea-release
settings:
base_url: https://git.paulbsd.com
api_key:
from_secret: gitea_token
files: "target/release/*.tar.gz"
checksum:
- sha256
- sha512
environment:
PLUGIN_TITLE: ""
when:
event: tag

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
*.json
*.swp
/*.gz
/perf*
/sample
/target

1277
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

40
Cargo.toml Normal file
View File

@ -0,0 +1,40 @@
[package]
name = "ipblc"
version = "1.0.0"
edition = "2021"
authors = ["PaulBSD <paul@paulbsd.com>"]
description = "ipblc is a tool that search and send attacking ip addresses to ipbl"
repository = "https://git.paulbsd.com/paulbsd/ipblc"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
chrono = { version = "0.4", features = ["serde"] }
clap = "3.1"
ipnet = "2.5"
lazy_static = "1.4"
mnl = "0.2"
nftnl = "0.6"
nix = "0.24"
regex = "1.5"
reqwest = { version = "0.11", default-features = false, features = ["json","rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.18", features = ["full"] }
zmq = "0.9"
# [target.aarch64-unknown-linux-gnu.dependencies]
# flate2 = { version = "1.0", features = ["cloudflare_zlib"] }
# [target.aarch64-linux-android.dependencies]
# flate2 = { version = "1.0", features = ["zlib"] }
# [target.armv7-unknown-linux-gnueabihf.dependencies]
# flate2 = { version = "1.0", features = ["zlib"] }
# [target.x86_64-unknown-linux-gnu.dependencies]
# flate2 = { version = "1.0", features = ["cloudflare_zlib"] }
[profile.release]
debug = false
opt-level = 3

2
Cross.toml Normal file
View File

@ -0,0 +1,2 @@
[target.aarch64-unknown-linux-musl]
image = "cross-arm64:latest"

6
Dockerfile Normal file
View File

@ -0,0 +1,6 @@
FROM rustembedded/cross:aarch64-unknown-linux-musl
RUN dpkg --add-architecture arm64
RUN apt-get update
RUN apt-get install -y libasound2-dev:arm64 libzmq3-dev libnftnl-dev libmnl-dev libmnl0:arm64 libnftnl7:arm64 libmnl0:amd64 libnftnl0:arm64
RUN apt-get clean

86
README.md Normal file
View File

@ -0,0 +1,86 @@
# ipblc
[![Build Status](https://drone.paulbsd.com/api/badges/paulbsd/ipblc/status.svg)](https://drone.paulbsd.com/paulbsd/ipblc)
## Summary
ipblc is a tool that search and send attacking ip addresses to ipbl
It's notification features are based on zeromq
## Howto
### Build
- Dev:
```bash
cargo build
```
- Release (with compiler optimizations)
```bash
cargo build --release
```
### Usage
```
USAGE:
ipblc [OPTIONS]
OPTIONS:
-d Enable debugging
-h, --help Print help information
-s, --server <server> Sets a ipbl server [default: https://ipbl.paulbsd.com]
-V, --version Print version informatio
```
### TODO
- ✅ Config centralization (Main config in ipbl)
- ✅ Handles date in log
- ✅ fine grain file opening
- ✅ Handle zeromq data transfer
- ✅ Code optimizations (WIP)
- ✅ Error handing when fetching config
### Date formats
```
nginx: 2022-02-09T10:05:02+01:00
ssh: 2022-02-09T09:29:15.797469+01:00
openvpn: 2022-02-09 09:58:59
mail: 2022-02-09T09:59:31.228303+01:00
```
## License
```text
Copyright (c) 2021, 2022 PaulBSD
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The views and conclusions contained in the software and documentation are those
of the authors and should not be interpreted as representing official policies,
either expressed or implied, of this project.
```

384
src/config/mod.rs Normal file
View File

@ -0,0 +1,384 @@
use crate::ip::*;
use crate::utils::*;
use chrono::prelude::*;
use chrono::Duration;
use clap::{Arg, ArgMatches, Command};
use nix::sys::inotify::{AddWatchFlags, InitFlags, Inotify, WatchDescriptor};
use regex::Regex;
use reqwest::{Client, Error as ReqError, Response};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::path::Path;
const SERVER: &str = "ipbl.paulbsd.com";
#[derive(Debug, Clone)]
pub struct Context {
pub blocklist: HashMap<String, IpData>,
pub cfg: Config,
pub client: Client,
pub discovery: Discovery,
pub flags: Flags,
pub hostname: String,
pub instance: Box<Inotify>,
pub sas: HashMap<String, SetMap>,
pub hashwd: HashMap<String, WatchDescriptor>,
}
#[derive(Debug, Clone)]
pub struct SetMap {
pub filename: String,
pub fullpath: String,
pub regex: Regex,
pub set: Set,
pub watchedfiles: HashMap<String, u64>,
pub wd: WatchDescriptor,
}
#[derive(Debug, Clone)]
pub struct Flags {
pub debug: bool,
pub interval: usize,
pub server: String,
}
impl Context {
pub async fn new() -> Self {
// Get flags
let debug = Context::argparse().is_present("debug");
let server = Context::argparse()
.value_of("server")
.unwrap_or(format!("https://{}", SERVER).as_str())
.to_string();
// Build context
let mut ctx = Context {
cfg: Config::new(),
flags: Flags {
debug: debug,
server: server,
interval: 60,
},
hostname: gethostname(true),
discovery: Discovery {
version: "1.0".to_string(),
urls: HashMap::new(),
},
client: Client::builder()
.user_agent(format!(
"{}/{}@{}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
gethostname(false)
))
.build()
.unwrap(),
sas: HashMap::new(),
instance: Box::new(Inotify::init(InitFlags::empty()).unwrap()),
blocklist: HashMap::new(),
hashwd: HashMap::new(),
};
ctx.discovery = ctx.discovery().await.unwrap();
print!("Loading config ... ");
match ctx.load().await {
Ok(_) => {}
Err(err) => {
println!("error loading config: {err}");
std::process::exit(1);
}
}
ctx
}
pub fn argparse() -> ArgMatches {
Command::new(env!("CARGO_PKG_NAME"))
.version(env!("CARGO_PKG_VERSION"))
.author(env!("CARGO_PKG_AUTHORS"))
.about(env!("CARGO_PKG_DESCRIPTION"))
.arg(
Arg::new("server")
.short('s')
.long("server")
.value_name("server")
.default_value("https://ipbl.paulbsd.com")
.help("Sets a http server")
.takes_value(true),
)
.arg(
Arg::new("debug")
.short('d')
.takes_value(false)
.help("Enable debugging"),
)
.get_matches()
}
pub async fn discovery(&self) -> Result<Discovery, ReqError> {
let resp: Result<Response, ReqError> = self
.client
.get(format!("{server}/discovery", server = self.flags.server))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Discovery = match req.json().await {
Ok(res) => res,
Err(err) => return Err(err),
};
Ok(data)
}
pub async fn load(&mut self) -> Result<(), Box<dyn std::error::Error>> {
self.cfg.load(self.to_owned()).await?;
self.create_sas().await?;
Ok(())
}
pub async fn get_blocklist(&self) -> Vec<IpData> {
let mut res: Vec<IpData> = vec![];
for (_, v) in self.blocklist.iter() {
res.push(v.clone());
}
res
}
pub async fn gc_blocklist(&mut self) -> Vec<IpData> {
let now: DateTime<Local> = Local::now().trunc_subsecs(0);
let delta: Duration = Duration::minutes(self.flags.interval as i64);
let mindate = now - delta;
let mut toremove: Vec<IpData> = vec![];
// nightly, future use
//let drained: HashMap<String,IpData> = ctx.blocklist.drain_filter(|k,v| v.parse_date() < mindate)
for (k, v) in self.blocklist.clone().iter() {
if v.parse_date() < mindate {
self.blocklist.remove(&k.to_string()).unwrap();
toremove.push(v.clone());
}
}
toremove
}
pub async fn update_blocklist(&mut self, ip: &IpData) {
if !self.blocklist.contains_key(&ip.ip) {
self.blocklist.insert(ip.ip.clone(), ip.clone());
}
}
pub async fn create_sas(&mut self) -> Result<(), Box<dyn std::error::Error>> {
for set in &self.cfg.sets {
let p = Path::new(set.path.as_str());
if p.is_dir() {
let res = match self.hashwd.get(&set.path.to_string()) {
Some(wd) => *wd,
None => {
let res = self
.instance
.add_watch(set.path.as_str(), AddWatchFlags::IN_MODIFY)
.unwrap();
self.hashwd.insert(set.path.to_string(), res);
res
}
};
let fullpath: String = match set.filename.as_str() {
"" => set.path.clone(),
_ => {
format!(
"{path}/{filename}",
path = set.path,
filename = set.filename.clone()
)
}
};
self.sas.insert(
set.t.clone(),
SetMap {
filename: set.filename.clone(),
fullpath: fullpath,
set: set.clone(),
regex: Regex::new(set.regex.as_str()).unwrap(),
wd: res,
watchedfiles: HashMap::new(),
},
);
}
}
Ok(())
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Config {
pub sets: Vec<Set>,
#[serde(skip_serializing)]
pub trustnets: Vec<String>,
pub zmq: HashMap<String, ZMQ>,
}
impl Config {
pub fn new() -> Self {
Self {
sets: vec![
Set {
t: "smtp".to_string(),
filename: "mail.log".to_string(),
regex: "(SASL LOGIN authentication failed)".to_string(),
path: "/var/log".to_string(),
},
Set {
t: "ssh".to_string(),
filename: "auth.log".to_string(),
regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(),
path: "/var/log".to_string(),
},
Set {
t: "http".to_string(),
filename: "".to_string(),
regex: "(anonymousfox.co)".to_string(),
path: "/var/log/nginx".to_string(),
}
,Set {
t: "openvpn".to_string(),
filename: "status".to_string(),
regex: "(UNDEF)".to_string(),
path: "/var/run/openvpn".to_string(),
},
],
trustnets: vec![
"127.0.0.0/8".to_string(),
"10.0.0.0/8".to_string(),
"172.16.0.0/12".to_string(),
"192.168.0.0/16".to_string(),
],
zmq: HashMap::from([("pubsub".to_string(),ZMQ{
t: "pubsub".to_string(),
hostname: SERVER.to_string(),
port: 9999,
subscription: "ipbl".to_string(),
}),("reqrep".to_string(),ZMQ {
t: "reqrep".to_string(),
hostname: SERVER.to_string(),
port: 9998,
subscription: String::new(),
})])
}
}
pub async fn load(&mut self, ctx: Context) -> Result<(), ReqError> {
self.get_sets(&ctx).await?;
self.get_trustnets(&ctx).await?;
self.get_zmq_config(&ctx).await?;
Ok(())
}
async fn get_trustnets(&mut self, ctx: &Context) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = ctx
.client
.get(format!(
"{server}/config/trustlist",
server = ctx.flags.server
))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<String> = match req.json::<Vec<String>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
self.trustnets = data;
Ok(())
}
async fn get_sets(&mut self, ctx: &Context) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = ctx
.client
.get(format!("{server}/config/sets", server = ctx.flags.server))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<Set> = match req.json::<Vec<Set>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
self.sets = data;
Ok(())
}
async fn get_zmq_config(&mut self, ctx: &Context) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = ctx
.client
.get(format!("{server}/config/zmq", server = ctx.flags.server))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: HashMap<String, ZMQ> = match req.json::<Vec<ZMQ>>().await {
Ok(res) => {
let mut out: HashMap<String, ZMQ> = HashMap::new();
res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.t.to_string(), x);
});
out
}
Err(err) => return Err(err),
};
self.zmq = data;
Ok(())
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Set {
#[serde(rename = "type")]
pub t: String,
pub filename: String,
pub regex: String,
pub path: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ZMQ {
#[serde(rename = "type")]
pub t: String,
pub hostname: String,
pub port: i64,
pub subscription: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Discovery {
pub version: String,
pub urls: HashMap<String, URL>,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct URL {
pub key: String,
pub path: String,
}
impl PartialEq for Set {
fn eq(&self, other: &Self) -> bool {
self.t == other.t
}
}
impl Eq for Set {}
impl Hash for Set {
fn hash<H: Hasher>(&self, state: &mut H) {
self.t.hash(state);
}
}

47
src/db.rs Normal file
View File

@ -0,0 +1,47 @@
use crate::config::*;
use crate::ip::*;
use futures::executor::block_on;
use rbatis;
use rbatis::crud::{Skip, CRUD};
use rbatis::rbatis::Rbatis;
use rbatis::DateTimeUtc;
use std::process::exit;
const DBTYPE: &'static str = "postgres";
pub async fn init(rb: &Rbatis, cfg: &Config) {
let connection: String = format!(
"{t}://{username}:{password}@{hostname}:{port}/{name}",
t = DBTYPE,
username = cfg.dbusername,
password = cfg.dbpassword,
hostname = cfg.dbhostname,
port = cfg.dbport,
name = cfg.dbname,
)
.to_string();
let link = rb.link(connection.as_str()).await;
match link {
Ok(_) => (),
Err(err) => {
println!("Please check connection parameters: {err}");
exit(1);
}
}
}
pub fn push_ip(ip: String, src: &String, rb: &Rbatis) {
let i = IP {
id: Some(0),
ip: Some(ip),
rdns: None,
src: Some(src.to_string()),
created: Some(DateTimeUtc::now()),
updated: Some(DateTimeUtc::now()),
};
match block_on(rb.save(&i, &[Skip::Column("id")])) {
Ok(r) => println!("{r:?}"),
Err(err) => println!("{error}", error = err.to_string()),
};
}

95
src/firewall/mod.rs Normal file
View File

@ -0,0 +1,95 @@
use crate::ip::*;
use nftnl::{nft_expr, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table};
use std::{ffi::CString, io::*, net::Ipv4Addr};
pub fn init(tablename: &String) -> (Batch, Table) {
let mut batch = Batch::new();
let table = Table::new(
&CString::new(tablename.as_str()).unwrap(),
ProtoFamily::Ipv4,
);
batch.add(&table, nftnl::MsgType::Add);
batch.add(&table, nftnl::MsgType::Del);
batch.add(&table, nftnl::MsgType::Add);
(batch, table)
}
pub fn block(
tablename: &String,
ips_add: &Vec<IpData>,
ret: &mut Vec<String>,
) -> std::result::Result<(), Error> {
// convert chain
let ips_add = convert(ips_add);
let (mut batch, table) = init(tablename);
// build chain
let mut chain = Chain::new(&CString::new(tablename.as_str()).unwrap(), &table);
chain.set_hook(nftnl::Hook::In, 1);
chain.set_policy(nftnl::Policy::Accept);
// add chain
batch.add(&chain, nftnl::MsgType::Add);
// build and add rules
for ip in ips_add.clone() {
let mut rule = Rule::new(&chain);
rule.add_expr(&nft_expr!(payload ipv4 saddr));
rule.add_expr(&nft_expr!(cmp == ip));
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 10u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict drop));
batch.add(&rule, nftnl::MsgType::Add);
}
// validate and send batch
let finalized_batch = batch.finalize();
send_and_process(&finalized_batch)?;
ret.push(format!(
"nftables: {length} ip in memory",
length = ips_add.len()
));
Ok(())
}
fn send_and_process(batch: &FinalizedBatch) -> std::result::Result<(), Error> {
let seq: u32 = 2;
let socket = mnl::Socket::new(mnl::Bus::Netfilter)?;
socket.send_all(batch)?;
let mut buffer = vec![0; nftnl::nft_nlmsg_maxsize() as usize];
while let Some(message) = socket_recv(&socket, &mut buffer[..])? {
match mnl::cb_run(message, seq, socket.portid())? {
mnl::CbResult::Stop => {
break;
}
mnl::CbResult::Ok => (),
}
}
Ok(())
}
fn socket_recv<'a>(
socket: &mnl::Socket,
buf: &'a mut [u8],
) -> std::result::Result<Option<&'a [u8]>, Error> {
let ret = socket.recv(buf)?;
if ret > 0 {
Ok(Some(&buf[..ret]))
} else {
Ok(None)
}
}
fn convert(input: &Vec<IpData>) -> Vec<Ipv4Addr> {
let mut output: Vec<Ipv4Addr> = vec![];
for val in input {
output.push(val.ip.parse::<Ipv4Addr>().unwrap());
}
output
}

252
src/ip.rs Normal file
View File

@ -0,0 +1,252 @@
use crate::config::Context;
use crate::utils::*;
use chrono::prelude::*;
use ipnet::IpNet;
use lazy_static::lazy_static;
use regex::Regex;
use reqwest::Error as ReqError;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::io::{BufRead, BufReader, Read};
use std::net::IpAddr;
lazy_static! {
static ref R_IPV4: Regex = Regex::new(include_str!("regexps/ipv4.txt")).unwrap();
static ref R_IPV6: Regex = Regex::new(include_str!("regexps/ipv6.txt")).unwrap();
static ref R_DATE: Regex = Regex::new(include_str!("regexps/date.txt")).unwrap();
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq)]
pub struct IpData {
pub ip: String,
pub src: String,
pub date: String,
pub hostname: String,
}
impl IpData {
pub fn parse_date(&self) -> DateTime<FixedOffset> {
DateTime::parse_from_rfc3339(self.date.as_str()).unwrap()
}
}
impl PartialEq for IpData {
fn eq(&self, other: &IpData) -> bool {
self.ip.as_bytes() == other.ip.as_bytes() && self.src == other.src
}
fn ne(&self, other: &IpData) -> bool {
!self.eq(other)
}
}
impl Ord for IpData {
fn cmp(&self, other: &IpData) -> Ordering {
self.ip.as_bytes().cmp(&other.ip.as_bytes())
}
}
impl PartialOrd for IpData {
fn partial_cmp(&self, other: &IpData) -> Option<Ordering> {
Some(self.cmp(&other))
}
}
impl Display for IpData {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"ip: {ip}, src: {src}, date: {date}, hostname: {hostname}",
ip = self.ip,
src = self.src,
date = self.date,
hostname = self.hostname,
)
}
}
pub async fn push_ip(ctx: &Context, ip: &IpData, ret: &mut Vec<String>) -> Result<(), ReqError> {
let result: String;
let mut data: Vec<IpData> = vec![];
data.push(IpData {
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
});
let resp = ctx
.client
.post(format!("{server}/ips", server = ctx.flags.server))
.json(&data)
.send()
.await?;
ret.push(format!("status: {status}", status = resp.status()));
let res = resp.text().await.unwrap();
if res.trim().len() > 0 {
result = res.trim().to_string();
} else {
result = "".to_string();
}
ret.push(format!("response: {result}"));
Ok(())
}
pub async fn _push_ip_bulk(
ctx: &Context,
ips: &Vec<IpData>,
ret: &mut Vec<String>,
) -> Result<(), ReqError> {
let result: String;
let mut data: Vec<IpData> = vec![];
for ip in ips {
data.push(IpData {
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
})
}
let resp = ctx
.client
.post(format!("{server}/ips", server = ctx.flags.server))
.json(&data)
.send()
.await?;
ret.push(format!("status: {status}", status = resp.status()));
let res = resp.text().await.unwrap();
if res.trim().len() > 0 {
result = res.trim().to_string();
} else {
result = "".to_string();
}
ret.push(format!("response: {result}"));
Ok(())
}
pub fn filter(
lines: Box<dyn Read>,
list: &mut Vec<IpData>,
trustnets: &Vec<IpNet>,
regex: &Regex,
src: &String,
lastprocess: &DateTime<Local>,
) -> isize {
let mut ips = 0;
let hostname = gethostname(true);
for line in BufReader::new(lines).lines() {
if let Ok(l) = line {
if regex.is_match(l.as_str()) {
let s_ipaddr: String;
match R_IPV4.captures(l.as_str()) {
Some(sv4) => {
s_ipaddr = sv4.get(0).unwrap().as_str().to_string();
}
None => {
continue;
/*match R_IPV6.captures(l.as_str()) {
Some(sv6) => {
s_ipaddr = sv6.get(0).unwrap().as_str().to_string();
}
None => {
continue;
}
};*/
}
};
let s_date: DateTime<Local>;
match R_DATE.captures(l.as_str()) {
Some(sdt) => {
s_date = parse_date(sdt);
if &s_date < lastprocess {
continue;
}
}
None => {
s_date = Local::now();
}
};
let ipaddr: IpAddr = match s_ipaddr.parse() {
Ok(ip) => ip,
Err(err) => {
println!("unparseable IP: {err} {s_ipaddr}");
continue;
}
};
if !is_trusted(&ipaddr, &trustnets) {
list.push(IpData {
ip: s_ipaddr,
src: src.to_owned(),
date: s_date.to_rfc3339().to_owned(),
hostname: hostname.to_owned(),
});
ips += 1;
};
}
}
}
ips
}
fn parse_date(input: regex::Captures) -> DateTime<Local> {
let mut ymd: Vec<u64> = vec![];
let mut hms: Vec<u64> = vec![];
let (daterange, hourrange) = (2..5, 5..8);
for i in daterange {
ymd.push(input.get(i).unwrap().as_str().parse::<u64>().unwrap());
}
for i in hourrange {
hms.push(input.get(i).unwrap().as_str().parse::<u64>().unwrap());
}
let date = Local
.ymd(ymd[0] as i32, ymd[1] as u32, ymd[2] as u32)
.and_hms(hms[0] as u32, hms[1] as u32, hms[2] as u32);
date
}
fn is_trusted(ip: &IpAddr, trustnets: &Vec<IpNet>) -> bool {
for net in trustnets {
if net.contains(ip) {
return true;
}
}
false
}
pub async fn _get_last(ctx: &Context) -> Result<Vec<IpData>, ReqError> {
let resp = ctx
.client
.get(format!("{server}/ips/last", server = ctx.flags.server))
.query(&[("interval", "3 hours")])
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
Ok(data)
}

73
src/ipblc/full.rs Normal file
View File

@ -0,0 +1,73 @@
use crate::api;
use crate::config::*;
use crate::ip;
use crate::utils::*;
use chrono::prelude::*;
use regex::Regex;
use std::sync::Arc;
pub async fn process(ctx: Arc<Context>) {
let arcctx = Arc::clone(&ctx);
let hostname = arcctx.hostname.as_str().to_string();
loop {
let mut ret = String::new();
ret.push_str(format!("host: {hostname}, ", hostname = hostname).as_str());
let list = process_lines(arcctx.clone()).await;
loop {
match api::push_ip_bulk(&list, arcctx.flags.server.clone(), &mut ret).await {
Ok(_) => {
break println!("{ret}");
}
Err(err) => {
println!("{err}");
sleep(1);
}
};
}
sleep(ctx.flags.interval as u64);
println!("{ret}");
}
}
async fn process_lines(ctx: Arc<Context>) -> Vec<ip::IpData> {
let mut ret = String::new();
let mut list: Vec<ip::IpData> = vec![];
let trustnets = build_trustnets(ctx.cfg.trustnets.clone());
let lastprocess = Local::now();
for folder in &ctx.cfg.folders {
for set in &folder.sets {
let regex = Regex::new(set.regex.as_str()).unwrap();
let filenames = get_filenames(&set.filename);
let mut numip = 0;
for filename in filenames {
numip = numip
+ super::handle_file(
&mut list,
&filename,
set,
&regex,
&lastprocess,
&trustnets,
)
.await;
}
ret.push_str(format!("{t}: {numip}, ", t = set.t, numip = numip).as_str());
}
}
let oldlen = list.len();
let newlen = dedup(&mut list);
ret.push_str(
format!(
"dedup ratio: {percent:.2}%, ",
percent = ((oldlen - newlen) as f64 / oldlen as f64) * 100.
)
.as_str(),
);
list
}

180
src/ipblc/inc.rs Normal file
View File

@ -0,0 +1,180 @@
use super::*;
use chrono::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
const BL_CHAN_SIZE: usize = 32;
const ZMQ_CHAN_SIZE: usize = 64;
pub async fn process(ctx: &Arc<Mutex<Context>>) {
println!(
"Launching {} version {}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION")
);
let (ipdatatx, mut ipdatarx): (Sender<IpData>, Receiver<IpData>) = channel(ZMQ_CHAN_SIZE);
// initialize the firewall table
//firewall::init(&env!("CARGO_PKG_NAME").to_string());
// initialize zeromq sockets
let reqsocket;
let subsocket;
{
let ctxarc = Arc::clone(&ctx);
let zmqctx = ctxarc.lock().await;
reqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ)
.await
.unwrap();
subsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB)
.await
.unwrap();
}
listenpubsub(&ctx, ipdatatx.clone(), subsocket).await;
let mut blrx = watchfiles(&ctx).await;
let ctxarc = Arc::clone(&ctx);
tokio::spawn(async move {
compare_files_changes(&ctxarc, &mut blrx, &ipdatatx).await;
});
loop {
let mut ret: Vec<String> = Vec::new();
// wait for logs parse and zmq channel receive
let ip = ipdatarx.recv().await.unwrap();
// lock the context mutex
let ctxarc = Arc::clone(&ctx);
let mut ctx = ctxarc.lock().await;
// refresh context blocklist
ctx.update_blocklist(&ip).await;
ctx.gc_blocklist().await;
// send ip list to ws and zmq sockets
if ip.hostname == ctx.hostname {
send_to_ipbl_ws(&ctx, &ip, &mut ret).await;
send_to_ipbl_zmq(&reqsocket, &ip).await;
}
// apply firewall blocking
firewall::block(
&env!("CARGO_PKG_NAME").to_string(),
&ctx.get_blocklist().await,
&mut ret,
)
.unwrap();
// log lines
println!("{ret}", ret = ret.join(", "));
// reload configuration from the server
match ctx.load().await {
Ok(_) => {}
Err(err) => {
println!("error loading config: {err}");
}
}
}
}
async fn watchfiles(ctx: &Arc<Mutex<Context>>) -> Receiver<FileEvent> {
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);
let ctx = Arc::clone(ctx);
tokio::spawn(async move {
loop {
let events: Vec<InotifyEvent>;
{
let ctx = ctx.lock().await;
events = ctx.instance.read_events().unwrap();
}
for event in events {
let date: DateTime<Local> = Local::now().trunc_subsecs(0);
bltx.send(FileEvent {
ie: event,
date: date,
})
.await
.unwrap();
}
}
});
blrx
}
async fn get_last_file_size(watchedfiles: &mut HashMap<String, u64>, path: &str) -> u64 {
let currentlen = match std::fs::metadata(&path.to_string()) {
Ok(u) => u.len().clone(),
Err(_) => 0u64,
};
let lastlen = match watchedfiles.insert(path.to_string(), currentlen) {
Some(u) => u,
None => 0,
};
lastlen
}
async fn compare_files_changes(
ctx: &Arc<Mutex<Context>>,
inotifyrx: &mut Receiver<FileEvent>,
ipdatatx: &Sender<IpData>,
) {
let mut trustnets;
loop {
let modifiedfiles = inotifyrx.recv().await.unwrap();
let mut list: Vec<IpData> = vec![];
let mut ctx = ctx.lock().await;
trustnets = build_trustnets(&ctx.cfg.trustnets);
match modifiedfiles.ie.name {
Some(name) => {
let inotify_filename = name.to_str().unwrap();
for sak in &mut ctx.clone().sas.keys() {
let sa = &mut ctx.sas.get_mut(sak).unwrap();
if modifiedfiles.ie.wd == sa.wd {
let handle_filename: String;
if sa.filename.as_str() == "" {
handle_filename = format!("{}/{}", &sa.fullpath, inotify_filename);
} else if inotify_filename.starts_with(sa.filename.as_str()) {
handle_filename = sa.fullpath.to_owned();
} else {
continue;
}
let filesize =
get_last_file_size(&mut sa.watchedfiles, &handle_filename).await;
match read_lines(&handle_filename, filesize) {
Some(lines) => {
filter(
lines,
&mut list,
&trustnets,
&sa.regex,
&sa.set.t,
&modifiedfiles.date,
);
}
None => {}
};
break;
}
}
drop(ctx);
for ip in list {
ipdatatx.send(ip).await.unwrap();
}
}
None => {}
}
}
}

79
src/ipblc/mod.rs Normal file
View File

@ -0,0 +1,79 @@
pub mod inc;
use crate::config::*;
use crate::firewall;
use crate::ip::*;
use crate::utils::*;
use crate::zmqcom::*;
use chrono::prelude::{DateTime, Local};
use nix::sys::inotify::InotifyEvent;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
pub struct FileEvent {
pub ie: InotifyEvent,
pub date: DateTime<Local>,
}
impl std::fmt::Debug for FileEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{ie:?}", ie = self.ie)
}
}
async fn send_to_ipbl_zmq(socket: &zmq::Socket, ip: &IpData) {
let msg = format!("{value}", value = serde_json::to_string(&ip).unwrap());
socket.send(&msg, 0).unwrap();
socket.recv_string(0).unwrap().unwrap();
}
async fn send_to_ipbl_ws(ctx: &Context, ip: &IpData, ret: &mut Vec<String>) {
ret.push(format!("host: {hostname}", hostname = ctx.hostname));
loop {
match push_ip(&ctx, &ip, ret).await {
Ok(_) => {
break;
}
Err(err) => {
println!("{err}");
sleep(1);
}
};
}
}
async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
let ctx = ctx.lock().await;
let prefix = format!(
"{subscription} ",
subscription = ctx.cfg.zmq.get("pubsub").unwrap().subscription
);
socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription");
drop(ctx);
tokio::spawn(async move {
loop {
let msgs: Option<String> = match socket.recv_string(0) {
Ok(s) => match s {
Ok(ss) => Some(ss),
Err(_) => None,
},
Err(_) => None,
};
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
let tosend: IpData = serde_json::from_str(msg).unwrap();
if tosend.hostname != gethostname(true) {
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
});
}

17
src/main.rs Normal file
View File

@ -0,0 +1,17 @@
mod config;
mod firewall;
mod ip;
mod ipblc;
mod utils;
mod zmqcom;
use config::Context;
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
pub async fn main() {
// Create a new context
let ctx = Arc::new(Mutex::new(Context::new().await));
ipblc::inc::process(&ctx).await;
}

1
src/regexps/date.txt Normal file
View File

@ -0,0 +1 @@
((\d{1,4})[-/](\d{2})[-/](\d{2})[T ](\d{2}):(\d{2}):(\d{2})((\.\d{6})?[+-](\d{2}):(\d{2}))?)

1
src/regexps/ipv4.txt Normal file
View File

@ -0,0 +1 @@
(?:[0-9]{1,3}\.){3}[0-9]{1,3}

1
src/regexps/ipv6.txt Normal file
View File

@ -0,0 +1 @@
((^\s*((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*$)|(^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$))

82
src/utils.rs Normal file
View File

@ -0,0 +1,82 @@
use ipnet::IpNet;
use lazy_static::lazy_static;
use nix::unistd;
use regex::Regex;
use std::boxed::Box;
use std::fs::File;
use std::io::*;
use std::path::Path;
use std::time::Duration;
lazy_static! {
static ref R_FILE_GZIP: Regex = Regex::new(r".*\.gz.*").unwrap();
}
pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
let mut file = match File::open(filename) {
Ok(f) => f,
Err(err) => {
println!("{err}");
return None;
}
};
file.seek(SeekFrom::Start(offset)).unwrap();
let lines: Box<dyn Read> = Box::new(BufReader::new(file));
Some(lines)
}
pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
// Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
}
pub fn build_trustnets(cfgtrustnets: &Vec<String>) -> Vec<IpNet> {
let mut trustnets: Vec<IpNet> = vec![];
for trustnet in cfgtrustnets {
match trustnet.parse() {
Ok(net) => trustnets.push(net),
Err(err) => {
println!("error parsing {trustnet}, error: {err}");
}
};
}
trustnets
}
pub fn sleep(seconds: u64) {
std::thread::sleep(Duration::from_secs(seconds));
}
pub fn gethostname(show_fqdn: bool) -> String {
let mut buf = [0u8; 64];
let hostname_cstr = unistd::gethostname(&mut buf).expect("Failed getting hostname");
let fqdn = hostname_cstr
.to_str()
.expect("Hostname wasn't valid UTF-8")
.to_string();
let hostname: Vec<&str> = fqdn.split(".").collect();
if show_fqdn {
return fqdn;
}
hostname[0].to_string()
}
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}

15
src/zmqcom.rs Normal file
View File

@ -0,0 +1,15 @@
use crate::config::*;
use zmq;
const ZMQPROTO: &str = "tcp";
pub async fn zconnect(zmqcfg: &ZMQ, zmqtype: zmq::SocketType) -> Result<zmq::Socket, zmq::Error> {
let zctx = zmq::Context::new();
let zmqhost = &zmqcfg.hostname;
let zmqport = zmqcfg.port;
let socket = zctx.socket(zmqtype).unwrap();
let connectstring = format!("{ZMQPROTO}://{zmqhost}:{zmqport}");
socket.connect(&connectstring.as_str())?;
Ok(socket)
}

12
tests/test.py Executable file
View File

@ -0,0 +1,12 @@
#!/usr/bin/python3
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://sys01:9999")
socket.setsockopt_string(zmq.SUBSCRIBE, "ipbl")
while True:
msg = socket.recv_string()
print(msg)

50
tests/testfw.rs Normal file
View File

@ -0,0 +1,50 @@
use ipnet::Ipv4Net;
use nftnl::{nft_set, set::Set, Batch, FinalizedBatch, ProtoFamily, Table};
use std::{ffi::CString, io::*, net::Ipv4Addr};
fn main() -> std::result::Result<(), Error> {
let mut batch = Batch::new();
let table = Table::new(&CString::new("aa").unwrap(), ProtoFamily::Inet);
let mut set: Set<Ipv4Addr> = nft_set!(
&CString::new("blacklist").unwrap(),
1,
&table,
ProtoFamily::Inet
);
let toadd = "9.9.9.8".parse::<Ipv4Addr>().unwrap();
set.add(&toadd);
println!("2");
batch.add(&set, nftnl::MsgType::Add);
let finalized_batch = batch.finalize();
send_and_process(&finalized_batch)?;
Ok(())
}
fn send_and_process(batch: &FinalizedBatch) -> std::result::Result<(), Error> {
let socket = mnl::Socket::new(mnl::Bus::Netfilter)?;
socket.send_all(batch)?;
let portid = socket.portid();
let mut buffer = vec![0; nftnl::nft_nlmsg_maxsize() as usize];
let very_unclear_what_this_is_for = 2;
while let Some(message) = socket_recv(&socket, &mut buffer[..])? {
match mnl::cb_run(message, very_unclear_what_this_is_for, portid)? {
mnl::CbResult::Stop => {
break;
}
mnl::CbResult::Ok => (),
}
}
Ok(())
}
fn socket_recv<'a>(
socket: &mnl::Socket,
buf: &'a mut [u8],
) -> std::result::Result<Option<&'a [u8]>, Error> {
let ret = socket.recv(buf)?;
if ret > 0 {
Ok(Some(&buf[..ret]))
} else {
Ok(None)
}
}