Compare commits

..

No commits in common. "master" and "1.0.0" have entirely different histories.

31 changed files with 1516 additions and 3640 deletions

View File

@ -1,59 +1,28 @@
--- ---
kind: pipeline kind: pipeline
type: docker type: docker
name: default-amd64 name: default
platform:
os: linux
arch: amd64
steps: steps:
- name: build and test - name: test and build
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev libclang-dev - apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev
- curl -o $${RUSTC_WRAPPER} https://assets.paulbsd.com/sccache_linux_${DRONE_STAGE_ARCH} - cargo build --verbose --all
- chmod +x $${RUSTC_WRAPPER} - cargo test --verbose --all
- cargo b -v
- cargo t -v
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes:
- name: cargo
path: /usr/local/cargo/registry
- name: apt
path: /var/cache/apt
when: when:
event: event: push
exclude:
- tag
- name: release - name: release
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev libclang-dev - apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev
- curl -o $${RUSTC_WRAPPER} https://assets.paulbsd.com/sccache_linux_${DRONE_STAGE_ARCH} - cargo build --release --verbose --all
- chmod +x $${RUSTC_WRAPPER}
- cargo b -r -v
- cd target/release - cd target/release
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc - tar -czvf ipblc-${DRONE_TAG}.tar.gz ipblc
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes:
- name: cargo
path: /usr/local/cargo/registry
- name: apt
path: /var/cache/apt
when: when:
event: event: tag
- tag
- name: publish - name: publish
image: plugins/gitea-release image: plugins/gitea-release
settings: settings:
@ -61,91 +30,10 @@ steps:
api_key: api_key:
from_secret: gitea_token from_secret: gitea_token
files: "target/release/*.tar.gz" files: "target/release/*.tar.gz"
checksum:
- sha256
- sha512
environment: environment:
PLUGIN_TITLE: "" PLUGIN_TITLE: ""
when: when:
event: event: tag
- tag
volumes:
- name: cargo
host:
path: /home/drone/cache/cargo
- name: apt
host:
path: /home/drone/cache/apt
---
kind: pipeline
type: docker
name: default-arm64
platform:
os: linux
arch: arm64
steps:
- name: build and test
image: rust:1
pull: always
commands:
- apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev libclang-dev
- curl -o $${RUSTC_WRAPPER} https://assets.paulbsd.com/sccache_linux_${DRONE_STAGE_ARCH}
- chmod +x $${RUSTC_WRAPPER}
- cargo b -v
- cargo t -v
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes:
- name: cargo
path: /usr/local/cargo/registry
- name: apt
path: /var/cache/apt
when:
event:
exclude:
- tag
- name: release
image: rust:1
pull: always
commands:
- apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev libclang-dev
- curl -o $${RUSTC_WRAPPER} https://assets.paulbsd.com/sccache_linux_${DRONE_STAGE_ARCH}
- chmod +x $${RUSTC_WRAPPER}
- cargo b -r -v
- cd target/release
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes:
- name: cargo
path: /usr/local/cargo/registry
- name: apt
path: /var/cache/apt
when:
event:
- tag
- name: publish
image: plugins/gitea-release
settings:
base_url: https://git.paulbsd.com
api_key:
from_secret: gitea_token
files: "target/release/*.tar.gz"
environment:
PLUGIN_TITLE: ""
when:
event:
- tag
volumes:
- name: cargo
host:
path: /home/drone/cache/cargo
- name: apt
host:
path: /home/drone/cache/apt

1
.gitignore vendored
View File

@ -1,6 +1,5 @@
*.json *.json
*.swp *.swp
/*diff*
/*.gz /*.gz
/perf* /perf*
/sample /sample

1982
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package] [package]
name = "ipblc" name = "ipblc"
version = "1.8.1" version = "1.0.0"
edition = "2021" edition = "2021"
authors = ["PaulBSD <paul@paulbsd.com>"] authors = ["PaulBSD <paul@paulbsd.com>"]
description = "ipblc is a tool that search and send attacking ip addresses to ipbl" description = "ipblc is a tool that search and send attacking ip addresses to ipbl"
@ -10,23 +10,31 @@ repository = "https://git.paulbsd.com/paulbsd/ipblc"
[dependencies] [dependencies]
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.5", features = ["string"] } clap = "3.1"
git-version = "0.3" ipnet = "2.5"
ipnet = "2.11" lazy_static = "1.4"
lazy_static = "1.5" mnl = "0.2"
nix = { version = "0.30", features = ["hostname", "inotify"] } nftnl = "0.6"
regex = "1.11" nix = "0.24"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } regex = "1.5"
rustables = "0.8.6" reqwest = { version = "0.11", default-features = false, features = ["json","rustls-tls"] }
rustables-macros = "0.1.2"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sd-notify = { version = "0.4" } tokio = { version = "1.18", features = ["full"] }
tokio = { version = "1.45", features = ["full", "sync"] } zmq = "0.9"
tungstenite = { version = "0.26", features = ["handshake", "rustls-tls-native-roots"] }
## to optimize binary size (slow compile time) # [target.aarch64-unknown-linux-gnu.dependencies]
#[profile.release] # flate2 = { version = "1.0", features = ["cloudflare_zlib"] }
#strip = true
#lto = true # [target.aarch64-linux-android.dependencies]
#opt-level = "z" # flate2 = { version = "1.0", features = ["zlib"] }
# [target.armv7-unknown-linux-gnueabihf.dependencies]
# flate2 = { version = "1.0", features = ["zlib"] }
# [target.x86_64-unknown-linux-gnu.dependencies]
# flate2 = { version = "1.0", features = ["cloudflare_zlib"] }
[profile.release]
debug = false
opt-level = 3

View File

@ -2,5 +2,5 @@ FROM rustembedded/cross:aarch64-unknown-linux-musl
RUN dpkg --add-architecture arm64 RUN dpkg --add-architecture arm64
RUN apt-get update RUN apt-get update
RUN apt-get install -y libnftnl-dev libmnl-dev libmnl0:arm64 libnftnl7:arm64 libmnl0:amd64 libnftnl0:arm64 RUN apt-get install -y libasound2-dev:arm64 libzmq3-dev libnftnl-dev libmnl-dev libmnl0:arm64 libnftnl7:arm64 libmnl0:amd64 libnftnl0:arm64
RUN apt-get clean RUN apt-get clean

View File

@ -1,10 +0,0 @@
# Notes
### Date formats
```
nginx: 2006-01-02T15:04:05+01:00
ssh: 2006-01-02T15:04:05.000000+01:00
openvpn: 2006-01-02 15:04:05
mail: 2006-01-02T15:04:05.000000+01:00
```

View File

@ -4,8 +4,8 @@
## Summary ## Summary
ipblc is client-side intrusion prevention software working closely with ipbl ipblc is a tool that search and send attacking ip addresses to ipbl
It's pub/sub features are websockets based It's notification features are based on zeromq
## Howto ## Howto
@ -26,39 +26,38 @@ cargo build --release
### Usage ### Usage
``` ```
ipblc is a tool that search and send attacking ip addresses to ipbl USAGE:
ipblc [OPTIONS]
Usage: ipblc [OPTIONS] OPTIONS:
-d Enable debugging
Options: -h, --help Print help information
-s, --server <server> Sets a http server [default: https://ipbl.paulbsd.com] -s, --server <server> Sets a ipbl server [default: https://ipbl.paulbsd.com]
-d Enable debugging -V, --version Print version informatio
-h, --help Print help information
-V, --version Print version information
``` ```
### TODO ### TODO
- ✅ Config centralization (Main config in ipbl) - ✅ Config centralization (Main config in ipbl)
- ✅ Handles date in log - ✅ Handles date in log
- ✅ Fine grain file opening - ✅ fine grain file opening
- ✅ Handle zeromq data transfer - ✅ Handle zeromq data transfer
- ✅ Code optimizations (WIP) - ✅ Code optimizations (WIP)
- ✅ Error handing when fetching config - ✅ Error handing when fetching config
- ✅ Local bound tcp api socket
- ✅ ZMQ -> Websocket
- ✅ Bug in RwLocks (agent often give up)
- ❌ Create memory friendly structs for ipdata
### Date formats
### Notes ```
nginx: 2022-02-09T10:05:02+01:00
See [here](NOTES.md) ssh: 2022-02-09T09:29:15.797469+01:00
openvpn: 2022-02-09 09:58:59
mail: 2022-02-09T09:59:31.228303+01:00
```
## License ## License
```text ```text
Copyright (c) 2022, 2023 PaulBSD Copyright (c) 2021, 2022 PaulBSD
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without Redistribution and use in source and binary forms, with or without

View File

@ -1,143 +0,0 @@
use rustables::*;
use rustables::{expr::*, Chain, Rule, Table};
use std::{io::*, net::*};
const TABLE_NAME: &str = "ipblc4";
const CHAIN_NAME: &str = "ipblc";
fn main() -> Result<()> {
/*let name = "blabla";
let mut batch = Batch::new();
let table = Table::new(ProtocolFamily::Ipv4).with_name(name);
batch.add(&table, MsgType::Add);
let mut chain = Chain::new(&table).with_name(name);
batch.add(&chain, MsgType::Add);
let toadd1: Ipv4Addr = "9.9.9.8".parse().unwrap();
let toadd2: Ipv4Addr = "9.9.9.1".parse().unwrap();
let mut setbuilder: SetBuilder<Ipv4Addr> = SetBuilder::new("s1", &table).unwrap();
setbuilder.add(&toadd1);
setbuilder.add(&toadd2);
let (mut set, setelem) = setbuilder.finish();
batch.add(&setelem, MsgType::Add);
//batch.add(&set, MsgType::Add);
set.family = ProtocolFamily::Ipv4;
set.id = Some(5);
set.flags = Some(0);
set.userdata = Some("test".into());
println!("{:?}", setelem);*/
let get_table = || -> Result<Option<Table>> {
let tables = list_tables().unwrap();
for table in tables {
if let Some(name) = table.get_name() {
println!("Found table {}", name);
if name == TABLE_NAME {
return Ok(Some(table));
}
}
}
Ok(None)
};
let get_chain = |table: &Table| -> Result<Option<Chain>> {
let chains = list_chains_for_table(table).unwrap();
for chain in chains {
if let Some(name) = chain.get_name() {
println!("Found chain {}", name);
if name == CHAIN_NAME {
return Ok(Some(chain));
}
}
}
Ok(None)
};
let table = get_table().unwrap().expect("no table?");
let chain = get_chain(&table).unwrap().expect("no chain?");
let ip: IpAddr = "184.73.167.217".parse().unwrap();
let cmprule = Rule::new(&chain).unwrap().saddr(ip).drop();
println!("{:?}", cmprule);
let mut gexpr = RawExpression::default();
for e in cmprule.get_expressions().unwrap().iter() {
if let Some(ExpressionVariant::Cmp(_)) = e.get_data() {
gexpr = e.clone();
}
}
let rules = list_rules_for_chain(&chain).unwrap();
for rule in rules {
let handle = rule.get_handle().unwrap();
println!("handle {}", handle);
let exprs = rule.get_expressions().unwrap();
for expr in exprs.iter() {
if let Some(ExpressionVariant::Cmp(_)) = expr.get_data() {
if expr.clone() == gexpr {
println!("{:?}", expr.get_data());
println!("test");
break;
}
}
//if expr.get_data()
//if expr.
}
}
//let mut set: Set<Ipv4Addr> = nft_set!(
// &CString::new("blabla").unwrap(),
// 32,
// &table,
// ProtoFamily::Ipv4 //ProtoFamily::Ipv4;
// //[&toadd1,&toadd2,]
//);
////println!("{:?}", set.0);
//set.add(&toadd1);
//set.add(&toadd2);
//batch.add(&set, MsgType::Add);
//let mut rule = Rule::new(&chain)
// .unwrap()
// .with_expr(
// HighLevelPayload::Network(NetworkHeaderField::IPv4(IPv4HeaderField::Saddr)).build(),
// )
// .with_expr(Lookup::new(&set).unwrap())
// .with_expr(Immediate::new_verdict(VerdictKind::Accept));
//println!("{:?}", rule);
//batch.add(&rule, rustables::MsgType::Add);
//match batch.send() {
// Ok(o) => {}
// Err(e) => {
// println!("{e}");
// }
//}
//rule.add_expr(&nft_expr!(payload ipv4 saddr));
//#[rustfmt::skip]
//rule.add_expr(&nft_expr!(lookup &set));
//rule.add_expr(&nft_expr!(ct state));
//rule.add_expr(&nft_expr!(verdict drop));
//batch.add(&rule, MsgType::Add);
//let finalized_batch = batch.finalize();
//send_and_process(&finalized_batch)?;
Ok(())
}
#[allow(dead_code)]
#[derive(Debug)]
struct Error(String);
impl<T: std::error::Error> From<T> for Error {
fn from(error: T) -> Self {
Error(error.to_string())
}
}

View File

@ -1,65 +0,0 @@
use nftnl::{nft_expr, set::Set, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table};
use std::{ffi::CString, io::*, net::Ipv4Addr};
fn main() -> std::result::Result<(), Error> {
let table_name = format!("ipblc4");
let table = Table::new(
&CString::new(format!("{table_name}")).unwrap(),
ProtoFamily::Ipv4,
);
let mut batch = Batch::new();
batch.add(&table, nftnl::MsgType::Add);
batch.add(&table, nftnl::MsgType::Del);
batch.add(&table, nftnl::MsgType::Add);
let mut chain = Chain::new(&CString::new("test").unwrap(), &table);
chain.set_hook(nftnl::Hook::In, 1);
chain.set_policy(nftnl::Policy::Accept);
batch.add(&chain, nftnl::MsgType::Add);
batch.add(&Rule::new(&chain), nftnl::MsgType::Del);
let mut rule = Rule::new(&chain);
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict accept));
batch.add(&rule, nftnl::MsgType::Add);
let finalized_batch = batch.finalize();
send_and_process(&finalized_batch)?;
Ok(())
}
fn send_and_process(batch: &FinalizedBatch) -> std::result::Result<(), Error> {
let seq: u32 = 2;
let socket = mnl::Socket::new(mnl::Bus::Netfilter)?;
socket.send_all(batch)?;
let mut buffer = vec![0; nftnl::nft_nlmsg_maxsize() as usize];
while let Some(message) = socket_recv(&socket, &mut buffer[..])? {
match mnl::cb_run(message, seq, socket.portid())? {
mnl::CbResult::Stop => {
break;
}
mnl::CbResult::Ok => (),
}
}
Ok(())
}
fn socket_recv<'a>(
socket: &mnl::Socket,
buf: &'a mut [u8],
) -> std::result::Result<Option<&'a [u8]>, Error> {
let ret = socket.recv(buf)?;
if ret > 0 {
Ok(Some(&buf[..ret]))
} else {
Ok(None)
}
}

View File

@ -1,28 +0,0 @@
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}
pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
// Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
}
pub async fn _sleep_ms(ms: u64) {
sleep(Duration::from_millis(ms)).await;
}

View File

@ -1,106 +0,0 @@
use crate::config::{Context, ZMQCfg};
use crate::ip::IpEvent;
use crate::utils::gethostname;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
const ZMQPROTO: &str = "tcp";
pub async fn zconnect<'a>(
zmqcfg: &ZMQCfg,
zmqtype: zmq::SocketType,
) -> Result<zmq::Socket, zmq::Error> {
let zctx = zmq::Context::new();
let zmqhost = &zmqcfg.hostname;
let zmqport = &zmqcfg.port;
let socket = zctx.socket(zmqtype).unwrap();
let connectstring = format!("{ZMQPROTO}://{zmqhost}:{zmqport}");
socket.connect(&connectstring.as_str())?;
Ok(socket)
}
pub async fn zmqinit(ctx: &Arc<RwLock<Context>>, ipeventtx: &Sender<IpEvent>) -> zmq::Socket {
let ctxarc = Arc::clone(&ctx);
let zmqreqsocket;
let zmqsubsocket;
{
let zmqctx = ctxarc.read().await;
zmqreqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ)
.await
.unwrap();
zmqsubsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB)
.await
.unwrap();
}
listenpubsub(&ctx, ipeventtx.clone(), zmqsubsocket).await;
return zmqreqsocket;
}
async fn listenpubsub(ctx: &Arc<RwLock<Context>>, txpubsub: Sender<IpEvent>, socket: zmq::Socket) {
let prefix;
{
let ctx = ctx.read().await;
prefix = format!(
"{sub} ",
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
);
socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription");
}
tokio::spawn(async move {
loop {
let msgs: Option<String> = match socket.recv_string(0) {
Ok(s) => match s {
Ok(ss) => Some(ss),
Err(e) => {
println!("{e:?}");
None
}
},
Err(e) => {
println!("{e:?}");
None
}
};
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
let tosend: IpEvent = serde_json::from_str(msg).unwrap();
if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
});
}
pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpEvent, _ret: &mut Vec<String>) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match reqsocket.send(&msg, 0) {
Ok(_) => {}
Err(e) => {
println!("{e:?}")
}
};
match reqsocket.recv_string(0) {
Ok(o) => match o {
Ok(_) => {}
Err(ee) => {
println!("{ee:?}")
}
},
Err(e) => {
println!("{e:?}")
}
};
}

View File

@ -1,6 +0,0 @@
#!/bin/sh
git pull
cargo b --release
sudo systemctl stop ipblc
sudo cp target/release/ipblc /usr/local/apps/ipblc/ipblc
sudo systemctl start ipblc

View File

@ -1,685 +0,0 @@
use crate::ip::{BlockIpData, IpData, IpEvent};
use crate::utils::{gethostname, sleep_s};
use std::{
collections::HashMap,
hash::{Hash, Hasher},
path::Path,
};
use chrono::prelude::*;
use chrono::Duration;
use clap::{Arg, ArgAction, ArgMatches, Command};
use git_version::git_version;
use ipnet::IpNet;
use nix::sys::inotify::{AddWatchFlags, Inotify, WatchDescriptor};
use regex::Regex;
use reqwest::{Client, Error as ReqError, Response};
use serde::{Deserialize, Serialize};
pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]);
const MASTERSERVER: &str = "ipbl.paulbsd.com";
const WSSUBSCRIPTION: &str = "ipbl";
const CONFIG_RETRY_INTERVAL: u64 = 2;
const WEB_CLIENT_TIMEOUT: i64 = 5;
#[derive(Debug)]
pub struct Context {
pub blocklist: HashMap<String, BlockIpData>,
pub cfg: Config,
pub discovery: Discovery,
pub flags: Flags,
pub sas: HashMap<String, SetMap>,
pub hashwd: HashMap<String, WatchDescriptor>,
pub reloadinterval: u64,
}
#[derive(Debug, Clone)]
pub struct SetMap {
pub filename: String,
pub fullpath: String,
pub regex: Regex,
pub set: SetCfg,
pub watchedfiles: HashMap<String, u64>,
pub wd: WatchDescriptor,
}
#[derive(Debug, Clone)]
pub struct Flags {
#[allow(dead_code)]
pub debug: bool,
pub server: String,
}
impl Context {
pub async fn new(inotify: &Inotify) -> Self {
// Get flags
let argp: ArgMatches = Context::argparse();
let debug: bool = argp.get_one::<bool>("debug").unwrap().to_owned();
let server: String = argp.get_one::<String>("server").unwrap().to_string();
// Build context
let mut ctx = Context {
cfg: Config::new(),
flags: Flags { debug, server },
discovery: Discovery {
version: "1.0".to_string(),
urls: HashMap::new(),
},
sas: HashMap::new(),
blocklist: HashMap::new(),
hashwd: HashMap::new(),
reloadinterval: 5,
};
print!("Loading config ... ");
ctx.load(&inotify).await.unwrap();
ctx
}
pub fn argparse() -> ArgMatches {
Command::new(env!("CARGO_PKG_NAME"))
.version(format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION))
.author(env!("CARGO_PKG_AUTHORS"))
.about(env!("CARGO_PKG_DESCRIPTION"))
.arg(
Arg::new("server")
.short('s')
.long("server")
.value_name("server")
.default_value(format!("https://{MASTERSERVER}"))
.help("Sets a http server"),
)
.arg(
Arg::new("debug")
.short('d')
.help("Enable debugging")
.action(ArgAction::SetTrue),
)
.get_matches()
}
#[allow(dead_code)]
pub async fn discovery(&self) -> Result<Discovery, ReqError> {
let resp: Result<Response, ReqError> = httpclient()
.get(format!("{server}/discovery", server = self.flags.server))
.send()
.await;
let req = match resp {
Ok(o) => o,
Err(e) => return Err(e),
};
let data: Discovery = match req.json().await {
Ok(o) => o,
Err(e) => return Err(e),
};
Ok(data)
}
pub async fn load(&mut self, inotify: &Inotify) -> Result<(), Box<dyn std::error::Error>> {
if cfg!(test) {
return Ok(());
}
let mut last_in_err = false;
loop {
let res = self.cfg.load(&self.flags.server).await;
match res {
Ok(()) => {
if last_in_err {
println!("loaded config");
}
break;
}
Err(e) => {
println!("error loading config: {e}, retrying in {CONFIG_RETRY_INTERVAL}s");
last_in_err = true;
sleep_s(CONFIG_RETRY_INTERVAL).await;
}
};
}
let mut last_in_err = false;
loop {
let res = self.discovery().await;
match res {
Ok(o) => {
self.discovery = o;
if last_in_err {
println!("loaded discovery");
}
break;
}
Err(e) => {
println!("error loading disvoery: {e}, retrying in {CONFIG_RETRY_INTERVAL}s");
last_in_err = true;
sleep_s(CONFIG_RETRY_INTERVAL).await;
}
};
}
if last_in_err {
println!("creating sas");
}
self.create_sas(&inotify).await?;
if last_in_err {
println!("created sas");
}
Ok(())
}
#[cfg(test)]
pub async fn get_blocklist_pending(&self) -> Vec<BlockIpData> {
let mut res: Vec<BlockIpData> = vec![];
for (_, ipblock) in self.blocklist.iter() {
res.push(ipblock.clone());
}
res
}
pub async fn get_blocklist_toblock(&self, all: bool) -> Vec<BlockIpData> {
let mut res: Vec<BlockIpData> = vec![];
for (_, ipblock) in self.blocklist.iter() {
match self.cfg.sets.get(&ipblock.ipdata.src) {
Some(set) => {
if ipblock.tryfail >= set.tryfail && (!ipblock.blocked || all) {
res.push(ipblock.clone());
}
}
None => {}
}
}
res
}
pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> {
match &ipevent.ipdata {
Some(ipdata) => match self.cfg.sets.get(&ipdata.src) {
Some(set) => {
let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str())
.unwrap()
.with_timezone(&chrono::Local);
let blocktime = set.blocktime;
let blocked = false;
let handle = u64::MIN;
if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
let block =
self.blocklist
.entry(ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipdata.clone(),
tryfail: 0,
starttime,
blocktime,
blocked,
handle,
});
block.tryfail += 1;
block.blocktime = blocktime;
if block.tryfail >= set.tryfail {
return Some(ipevent.clone());
}
} else {
self.blocklist
.entry(ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipdata.clone(),
tryfail: set.tryfail,
starttime,
blocktime,
blocked,
handle,
});
}
}
None => {}
},
None => {}
}
None
}
pub async fn gc_blocklist(&mut self) -> Vec<BlockIpData> {
let mut removed: Vec<BlockIpData> = vec![];
let now: DateTime<Local> = Local::now().trunc_subsecs(0);
// nightly, future use
// let drained: HashMap<String,IpData> = ctx.blocklist.drain_filter(|k,v| v.parse_date() < mindate)
for (ip, blocked) in self.blocklist.clone().iter() {
/*match self.cfg.sets.get(&blocked.ipdata.src) {
Some(set) => {
let mut block = self.blocklist.get_mut(ip).unwrap();
block.blocktime = set.blocktime.clone();
}
None => {}
}*/
let mindate = now - Duration::minutes(blocked.blocktime);
if blocked.starttime < mindate {
self.blocklist.remove(&ip.clone()).unwrap();
removed.push(blocked.clone());
}
}
removed
}
pub async fn create_sas(
&mut self,
inotify: &Inotify,
) -> Result<(), Box<dyn std::error::Error>> {
for (src, set) in self.cfg.sets.iter() {
let p = Path::new(set.path.as_str());
if p.is_dir() {
let wd = match self.hashwd.get(&set.path.to_string()) {
Some(wd) => *wd,
None => {
let res = inotify
.add_watch(set.path.as_str(), AddWatchFlags::IN_MODIFY)
.unwrap();
self.hashwd.insert(set.path.to_string(), res);
res
}
};
let fullpath: String = match set.filename.as_str() {
"" => set.path.clone(),
_ => {
format!(
"{path}/{filename}",
path = set.path,
filename = set.filename.clone()
)
}
};
match self.sas.get_mut(&src.clone()) {
Some(s) => {
s.filename = set.filename.clone();
s.fullpath = fullpath;
s.set = set.clone();
s.regex = Regex::new(set.regex.as_str()).unwrap();
}
None => {
self.sas.insert(
src.clone(),
SetMap {
filename: set.filename.clone(),
fullpath,
set: set.clone(),
regex: Regex::new(set.regex.as_str()).unwrap(),
wd,
watchedfiles: HashMap::new(),
},
);
}
}
}
}
Ok(())
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Config {
pub sets: HashMap<String, SetCfg>,
#[serde(skip_serializing)]
pub trustnets: Vec<String>,
pub ws: HashMap<String, WebSocketCfg>,
pub api: String,
}
impl Config {
pub fn new() -> Self {
Self {
sets: HashMap::from([
("smtp".to_string(),
SetCfg {
src: "smtp".to_string(),
filename: "mail.log".to_string(),
regex: "(SASL LOGIN authentication failed)".to_string(),
path: "/var/log".to_string(),
blocktime: 60,
tryfail: 5,
}),
("ssh".to_string(),
SetCfg {
src: "ssh".to_string(),
filename: "auth.log".to_string(),
regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(),
path: "/var/log".to_string(),
blocktime: 60,
tryfail: 5,
},),
("http".to_string(),
SetCfg {
src: "http".to_string(),
filename: "".to_string(),
regex: "(anonymousfox.co)".to_string(),
path: "/var/log/nginx".to_string(),
blocktime: 60,
tryfail: 5,
},),
("openvpn".to_string(),
SetCfg {
src: "openvpn".to_string(),
filename: "status".to_string(),
regex: "(UNDEF)".to_string(),
path: "/var/run/openvpn".to_string(),
blocktime: 60,
tryfail: 5,
},),
]),
trustnets: vec![
"127.0.0.0/8".to_string(),
"10.0.0.0/8".to_string(),
"172.16.0.0/12".to_string(),
"192.168.0.0/16".to_string(),
],
ws: HashMap::from([("pubsub".to_string(),WebSocketCfg{
t: "pubsub".to_string(),
endpoint: format!("wss://{}/wsps", MASTERSERVER.to_string()),
subscription: WSSUBSCRIPTION.to_string(),
}),("reqrep".to_string(), WebSocketCfg {
t: "reqrep".to_string(),
endpoint: format!("wss://{}/wsrr", MASTERSERVER.to_string()),
subscription: WSSUBSCRIPTION.to_string(),
})]),
api: String::from("127.0.0.1:8060")
}
}
pub async fn load(&mut self, server: &String) -> Result<(), ReqError> {
self.get_config(server).await?;
Ok(())
}
async fn get_config(&mut self, server: &String) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = httpclient()
.get(format!("{server}/config?v=2"))
.send()
.await;
let req = match resp {
Ok(o) => o,
Err(e) => return Err(e),
};
let data: GlobalConfigV2 = match req.json::<GlobalConfigV2>().await {
Ok(o) => o,
Err(e) => return Err(e),
};
for d in data.sets {
self.sets.insert(d.src.clone(), d);
}
self.trustnets = data.trustlists;
data.ws.into_iter().map(|x| x).for_each(|x| {
self.ws.insert(x.t.to_string(), x);
});
self.api = data
.cfg
.get(&"api".to_string())
.unwrap_or(&self.api)
.clone();
Ok(())
}
pub async fn _get_last(server: &String) -> Result<Vec<IpData>, ReqError> {
let resp = httpclient()
.get(format!("{server}/ips/last"))
.query(&[("interval", "3 hours")])
.send()
.await;
let req = match resp {
Ok(o) => o,
Err(e) => return Err(e),
};
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
Ok(o) => o,
Err(e) => return Err(e),
};
Ok(data)
}
pub fn build_trustnets(&self) -> Vec<IpNet> {
let mut trustnets: Vec<IpNet> = vec![];
for trustnet in &self.trustnets {
match trustnet.parse() {
Ok(net) => trustnets.push(net),
Err(e) => {
println!("error parsing {trustnet}, error: {e}");
}
};
}
trustnets
}
pub fn bootstrap_event(&self) -> IpEvent {
IpEvent {
msgtype: String::from("bootstrap"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: None,
}
}
}
pub fn httpclient() -> Client {
let client = Client::builder()
.user_agent(format!(
"{}/{}@{}/{}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
GIT_VERSION,
gethostname(false)
))
.timeout(Duration::seconds(WEB_CLIENT_TIMEOUT).to_std().unwrap())
.build()
.unwrap();
client
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct GlobalConfigV2 {
pub cfg: HashMap<String, String>,
pub sets: Vec<SetCfg>,
pub trustlists: Vec<String>,
pub ws: Vec<WebSocketCfg>,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct SetCfg {
pub src: String,
pub filename: String,
pub regex: String,
pub path: String,
pub blocktime: i64,
pub tryfail: i64,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct WebSocketCfg {
#[serde(rename = "type")]
pub t: String,
pub endpoint: String,
pub subscription: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Discovery {
pub version: String,
pub urls: HashMap<String, URL>,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct URL {
pub key: String,
pub path: String,
}
impl PartialEq for SetCfg {
fn eq(&self, other: &Self) -> bool {
self.src == other.src
}
}
impl Hash for SetCfg {
fn hash<H: Hasher>(&self, state: &mut H) {
self.src.hash(state);
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::ip::*;
use nix::sys::inotify::InitFlags;
use Context;
pub async fn prepare_test_data() -> Context {
let inotify = Inotify::init(InitFlags::empty()).unwrap();
let mut ctx = Context::new(&inotify).await;
let now: DateTime<Local> = Local::now().trunc_subsecs(0);
ctx.blocklist = HashMap::new();
for _i in 0..10 {
ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ip: "1.1.1.1".to_string(),
hostname: "test1".to_string(),
date: now.to_rfc3339().to_string(),
src: "ssh".to_string(),
}),
})
.await;
}
for _ in 0..10 {
ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ip: "1.1.1.2".to_string(),
hostname: "test2".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
})
.await;
}
ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ip: "1.1.1.3".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
})
.await;
ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
})
.await;
ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
})
.await;
ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 6,
ip: "2a00:1450:4007:805::2003".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
})
.await;
let ip1 = ctx.blocklist.get_mut(&"1.1.1.1".to_string()).unwrap();
ip1.starttime = DateTime::from(now) - Duration::minutes(61);
let ip2 = ctx.blocklist.get_mut(&"1.1.1.2".to_string()).unwrap();
ip2.starttime = DateTime::from(now) - Duration::minutes(62);
ctx
}
#[tokio::test]
pub async fn test_blocklist_pending() {
let ctx = prepare_test_data().await;
let pending = ctx.get_blocklist_pending().await;
assert_eq!(pending.len(), 5);
let ips = [
"1.1.1.1",
"1.1.1.2",
"1.1.1.3",
"1.1.1.4",
"2a00:1450:4007:805::2003",
];
for i in ips {
let ip = ctx
.blocklist
.get(&i.to_string())
.unwrap()
.ipdata
.ip
.as_str();
assert_eq!(ip, i);
}
}
#[tokio::test]
pub async fn test_blocklist_toblock() {
let mut ctx = prepare_test_data().await;
ctx.gc_blocklist().await;
let toblock = ctx.get_blocklist_toblock(false).await;
assert_eq!(toblock.len(), 3);
}
#[tokio::test]
pub async fn test_blocklist_gc() {
let mut ctx = prepare_test_data().await;
let after_gc = ctx.gc_blocklist().await;
assert_eq!(after_gc.len(), 2);
let ips = &["1.1.1.3", "1.1.1.4"];
for ip in ips {
let ipstring = ip.to_string();
assert_eq!(ctx.blocklist.get(&ipstring).unwrap().ipdata.ip, ipstring);
}
}
}

384
src/config/mod.rs Normal file
View File

@ -0,0 +1,384 @@
use crate::ip::*;
use crate::utils::*;
use chrono::prelude::*;
use chrono::Duration;
use clap::{Arg, ArgMatches, Command};
use nix::sys::inotify::{AddWatchFlags, InitFlags, Inotify, WatchDescriptor};
use regex::Regex;
use reqwest::{Client, Error as ReqError, Response};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::path::Path;
const SERVER: &str = "ipbl.paulbsd.com";
#[derive(Debug, Clone)]
pub struct Context {
pub blocklist: HashMap<String, IpData>,
pub cfg: Config,
pub client: Client,
pub discovery: Discovery,
pub flags: Flags,
pub hostname: String,
pub instance: Box<Inotify>,
pub sas: HashMap<String, SetMap>,
pub hashwd: HashMap<String, WatchDescriptor>,
}
#[derive(Debug, Clone)]
pub struct SetMap {
pub filename: String,
pub fullpath: String,
pub regex: Regex,
pub set: Set,
pub watchedfiles: HashMap<String, u64>,
pub wd: WatchDescriptor,
}
#[derive(Debug, Clone)]
pub struct Flags {
pub debug: bool,
pub interval: usize,
pub server: String,
}
impl Context {
pub async fn new() -> Self {
// Get flags
let debug = Context::argparse().is_present("debug");
let server = Context::argparse()
.value_of("server")
.unwrap_or(format!("https://{}", SERVER).as_str())
.to_string();
// Build context
let mut ctx = Context {
cfg: Config::new(),
flags: Flags {
debug: debug,
server: server,
interval: 60,
},
hostname: gethostname(true),
discovery: Discovery {
version: "1.0".to_string(),
urls: HashMap::new(),
},
client: Client::builder()
.user_agent(format!(
"{}/{}@{}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
gethostname(false)
))
.build()
.unwrap(),
sas: HashMap::new(),
instance: Box::new(Inotify::init(InitFlags::empty()).unwrap()),
blocklist: HashMap::new(),
hashwd: HashMap::new(),
};
ctx.discovery = ctx.discovery().await.unwrap();
print!("Loading config ... ");
match ctx.load().await {
Ok(_) => {}
Err(err) => {
println!("error loading config: {err}");
std::process::exit(1);
}
}
ctx
}
pub fn argparse() -> ArgMatches {
Command::new(env!("CARGO_PKG_NAME"))
.version(env!("CARGO_PKG_VERSION"))
.author(env!("CARGO_PKG_AUTHORS"))
.about(env!("CARGO_PKG_DESCRIPTION"))
.arg(
Arg::new("server")
.short('s')
.long("server")
.value_name("server")
.default_value("https://ipbl.paulbsd.com")
.help("Sets a http server")
.takes_value(true),
)
.arg(
Arg::new("debug")
.short('d')
.takes_value(false)
.help("Enable debugging"),
)
.get_matches()
}
pub async fn discovery(&self) -> Result<Discovery, ReqError> {
let resp: Result<Response, ReqError> = self
.client
.get(format!("{server}/discovery", server = self.flags.server))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Discovery = match req.json().await {
Ok(res) => res,
Err(err) => return Err(err),
};
Ok(data)
}
pub async fn load(&mut self) -> Result<(), Box<dyn std::error::Error>> {
self.cfg.load(self.to_owned()).await?;
self.create_sas().await?;
Ok(())
}
pub async fn get_blocklist(&self) -> Vec<IpData> {
let mut res: Vec<IpData> = vec![];
for (_, v) in self.blocklist.iter() {
res.push(v.clone());
}
res
}
pub async fn gc_blocklist(&mut self) -> Vec<IpData> {
let now: DateTime<Local> = Local::now().trunc_subsecs(0);
let delta: Duration = Duration::minutes(self.flags.interval as i64);
let mindate = now - delta;
let mut toremove: Vec<IpData> = vec![];
// nightly, future use
//let drained: HashMap<String,IpData> = ctx.blocklist.drain_filter(|k,v| v.parse_date() < mindate)
for (k, v) in self.blocklist.clone().iter() {
if v.parse_date() < mindate {
self.blocklist.remove(&k.to_string()).unwrap();
toremove.push(v.clone());
}
}
toremove
}
pub async fn update_blocklist(&mut self, ip: &IpData) {
if !self.blocklist.contains_key(&ip.ip) {
self.blocklist.insert(ip.ip.clone(), ip.clone());
}
}
pub async fn create_sas(&mut self) -> Result<(), Box<dyn std::error::Error>> {
for set in &self.cfg.sets {
let p = Path::new(set.path.as_str());
if p.is_dir() {
let res = match self.hashwd.get(&set.path.to_string()) {
Some(wd) => *wd,
None => {
let res = self
.instance
.add_watch(set.path.as_str(), AddWatchFlags::IN_MODIFY)
.unwrap();
self.hashwd.insert(set.path.to_string(), res);
res
}
};
let fullpath: String = match set.filename.as_str() {
"" => set.path.clone(),
_ => {
format!(
"{path}/{filename}",
path = set.path,
filename = set.filename.clone()
)
}
};
self.sas.insert(
set.t.clone(),
SetMap {
filename: set.filename.clone(),
fullpath: fullpath,
set: set.clone(),
regex: Regex::new(set.regex.as_str()).unwrap(),
wd: res,
watchedfiles: HashMap::new(),
},
);
}
}
Ok(())
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Config {
pub sets: Vec<Set>,
#[serde(skip_serializing)]
pub trustnets: Vec<String>,
pub zmq: HashMap<String, ZMQ>,
}
impl Config {
pub fn new() -> Self {
Self {
sets: vec![
Set {
t: "smtp".to_string(),
filename: "mail.log".to_string(),
regex: "(SASL LOGIN authentication failed)".to_string(),
path: "/var/log".to_string(),
},
Set {
t: "ssh".to_string(),
filename: "auth.log".to_string(),
regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(),
path: "/var/log".to_string(),
},
Set {
t: "http".to_string(),
filename: "".to_string(),
regex: "(anonymousfox.co)".to_string(),
path: "/var/log/nginx".to_string(),
}
,Set {
t: "openvpn".to_string(),
filename: "status".to_string(),
regex: "(UNDEF)".to_string(),
path: "/var/run/openvpn".to_string(),
},
],
trustnets: vec![
"127.0.0.0/8".to_string(),
"10.0.0.0/8".to_string(),
"172.16.0.0/12".to_string(),
"192.168.0.0/16".to_string(),
],
zmq: HashMap::from([("pubsub".to_string(),ZMQ{
t: "pubsub".to_string(),
hostname: SERVER.to_string(),
port: 9999,
subscription: "ipbl".to_string(),
}),("reqrep".to_string(),ZMQ {
t: "reqrep".to_string(),
hostname: SERVER.to_string(),
port: 9998,
subscription: String::new(),
})])
}
}
pub async fn load(&mut self, ctx: Context) -> Result<(), ReqError> {
self.get_sets(&ctx).await?;
self.get_trustnets(&ctx).await?;
self.get_zmq_config(&ctx).await?;
Ok(())
}
async fn get_trustnets(&mut self, ctx: &Context) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = ctx
.client
.get(format!(
"{server}/config/trustlist",
server = ctx.flags.server
))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<String> = match req.json::<Vec<String>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
self.trustnets = data;
Ok(())
}
async fn get_sets(&mut self, ctx: &Context) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = ctx
.client
.get(format!("{server}/config/sets", server = ctx.flags.server))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<Set> = match req.json::<Vec<Set>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
self.sets = data;
Ok(())
}
async fn get_zmq_config(&mut self, ctx: &Context) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = ctx
.client
.get(format!("{server}/config/zmq", server = ctx.flags.server))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: HashMap<String, ZMQ> = match req.json::<Vec<ZMQ>>().await {
Ok(res) => {
let mut out: HashMap<String, ZMQ> = HashMap::new();
res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.t.to_string(), x);
});
out
}
Err(err) => return Err(err),
};
self.zmq = data;
Ok(())
}
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Set {
#[serde(rename = "type")]
pub t: String,
pub filename: String,
pub regex: String,
pub path: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ZMQ {
#[serde(rename = "type")]
pub t: String,
pub hostname: String,
pub port: i64,
pub subscription: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Discovery {
pub version: String,
pub urls: HashMap<String, URL>,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct URL {
pub key: String,
pub path: String,
}
impl PartialEq for Set {
fn eq(&self, other: &Self) -> bool {
self.t == other.t
}
}
impl Eq for Set {}
impl Hash for Set {
fn hash<H: Hasher>(&self, state: &mut H) {
self.t.hash(state);
}
}

95
src/firewall/mod.rs Normal file
View File

@ -0,0 +1,95 @@
use crate::ip::*;
use nftnl::{nft_expr, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table};
use std::{ffi::CString, io::*, net::Ipv4Addr};
pub fn init(tablename: &String) -> (Batch, Table) {
let mut batch = Batch::new();
let table = Table::new(
&CString::new(tablename.as_str()).unwrap(),
ProtoFamily::Ipv4,
);
batch.add(&table, nftnl::MsgType::Add);
batch.add(&table, nftnl::MsgType::Del);
batch.add(&table, nftnl::MsgType::Add);
(batch, table)
}
pub fn block(
tablename: &String,
ips_add: &Vec<IpData>,
ret: &mut Vec<String>,
) -> std::result::Result<(), Error> {
// convert chain
let ips_add = convert(ips_add);
let (mut batch, table) = init(tablename);
// build chain
let mut chain = Chain::new(&CString::new(tablename.as_str()).unwrap(), &table);
chain.set_hook(nftnl::Hook::In, 1);
chain.set_policy(nftnl::Policy::Accept);
// add chain
batch.add(&chain, nftnl::MsgType::Add);
// build and add rules
for ip in ips_add.clone() {
let mut rule = Rule::new(&chain);
rule.add_expr(&nft_expr!(payload ipv4 saddr));
rule.add_expr(&nft_expr!(cmp == ip));
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 10u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict drop));
batch.add(&rule, nftnl::MsgType::Add);
}
// validate and send batch
let finalized_batch = batch.finalize();
send_and_process(&finalized_batch)?;
ret.push(format!(
"nftables: {length} ip in memory",
length = ips_add.len()
));
Ok(())
}
fn send_and_process(batch: &FinalizedBatch) -> std::result::Result<(), Error> {
let seq: u32 = 2;
let socket = mnl::Socket::new(mnl::Bus::Netfilter)?;
socket.send_all(batch)?;
let mut buffer = vec![0; nftnl::nft_nlmsg_maxsize() as usize];
while let Some(message) = socket_recv(&socket, &mut buffer[..])? {
match mnl::cb_run(message, seq, socket.portid())? {
mnl::CbResult::Stop => {
break;
}
mnl::CbResult::Ok => (),
}
}
Ok(())
}
fn socket_recv<'a>(
socket: &mnl::Socket,
buf: &'a mut [u8],
) -> std::result::Result<Option<&'a [u8]>, Error> {
let ret = socket.recv(buf)?;
if ret > 0 {
Ok(Some(&buf[..ret]))
} else {
Ok(None)
}
}
fn convert(input: &Vec<IpData>) -> Vec<Ipv4Addr> {
let mut output: Vec<Ipv4Addr> = vec![];
for val in input {
output.push(val.ip.parse::<Ipv4Addr>().unwrap());
}
output
}

208
src/fw.rs
View File

@ -1,208 +0,0 @@
use crate::{config::Context, ip::BlockIpData, ipblc::PKG_NAME};
use std::{
io::Error,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
sync::Arc,
};
use tokio::sync::RwLock;
use rustables::{expr::*, *};
pub enum FwTableType {
IPv4,
IPv6,
}
#[allow(dead_code)]
pub enum FwAction {
Add,
Delete,
}
macro_rules! initrules {
($batch:expr, $table:expr, $chain:ident, $reset:expr) => {
$chain.set_hook(Hook::new(HookClass::In, 1));
$batch.add(&$chain, MsgType::Add);
if $reset {
$batch.add(&Rule::new(&$chain).unwrap(), MsgType::Del);
}
};
}
macro_rules! makerules {
($ipdata:ident, $chain:ident, $batch:ident, $t:ty, $ip_t:ident,$action:ty) => {
let ip = $ipdata.ipdata.ip.parse::<$t>().unwrap();
Rule::new(&$chain)
.unwrap()
.saddr(ip.into())
.drop()
.add_to_batch(&mut $batch);
};
}
pub fn fwglobalinit(t: FwTableType, reset: bool) -> (Batch, Chain) {
let table_name: String;
let table: Table;
let mut chain: Chain;
match t {
FwTableType::IPv4 => {
table_name = format!("{PKG_NAME}4");
table = Table::new(ProtocolFamily::Ipv4).with_name(table_name);
chain = Chain::new(&table)
.with_policy(ChainPolicy::Accept)
.with_name(PKG_NAME);
}
FwTableType::IPv6 => {
table_name = format!("{PKG_NAME}6");
table = Table::new(ProtocolFamily::Ipv6).with_name(table_name);
chain = Chain::new(&table)
.with_policy(ChainPolicy::Accept)
.with_name(PKG_NAME);
}
}
let mut batch = Batch::new();
batch.add(&table, MsgType::Add);
initrules!(batch, table, chain, reset);
(batch, chain)
}
pub fn fwblock<'a>(ip_add: &BlockIpData) -> std::result::Result<&String, error::QueryError> {
let (mut batch4, chain4) = fwglobalinit(FwTableType::IPv4, false);
let (mut batch6, chain6) = fwglobalinit(FwTableType::IPv6, false);
match ip_add.ipdata.t {
4 => {
makerules!(ip_add, chain4, batch4, Ipv4Addr, ipv4, FwAction::Add);
match batch4.send() {
Ok(_) => {}
Err(e) => {
println!("block not ok {e} {ip_add:?}")
}
}
}
6 => {
makerules!(ip_add, chain6, batch6, Ipv6Addr, ipv6, FwAction::Add);
match batch6.send() {
Ok(_) => {}
Err(e) => {
println!("block not ok {e} {ip_add:?}")
}
}
}
_ => {}
}
Ok(&ip_add.ipdata.ip)
}
pub fn fwunblock<'a>(ip_del: &BlockIpData) -> std::result::Result<&String, error::QueryError> {
let (mut batch4, chain4) = fwglobalinit(FwTableType::IPv4, false);
let (mut batch6, chain6) = fwglobalinit(FwTableType::IPv6, false);
match ip_del.ipdata.t {
4 => {
let r = Rule::new(&chain4).unwrap().with_handle(ip_del.handle);
batch4.add(&r, MsgType::Del);
match batch4.send() {
Ok(_) => {}
Err(e) => {
println!("delete not ok {e} {ip_del:?}")
}
}
}
6 => {
let r = Rule::new(&chain6).unwrap().with_handle(ip_del.handle);
batch6.add(&r, MsgType::Del);
match batch6.send() {
Ok(_) => {}
Err(e) => {
println!("delete not ok {e} {ip_del:?}")
}
}
}
_ => {}
}
Ok(&ip_del.ipdata.ip)
}
pub async fn get_current_rules(
ctx: &Arc<RwLock<Context>>,
ret: &mut Vec<String>,
fwlen: &mut usize,
) -> Result<(), Error> {
let mut ips_all_count = 0;
let tables = vec![format!("{PKG_NAME}4"), format!("{PKG_NAME}6")];
for table_name in tables {
let get_table = || -> Result<Option<Table>, Error> {
let tables = list_tables().unwrap();
for table in tables {
if let Some(name) = table.get_name() {
if *name == table_name {
return Ok(Some(table));
}
}
}
Ok(None)
};
let get_chain = |table: &Table| -> Result<Option<Chain>, Error> {
let chains = list_chains_for_table(table).unwrap();
for chain in chains {
if let Some(name) = chain.get_name() {
if *name == "ipblc" {
return Ok(Some(chain));
}
}
}
Ok(None)
};
let table = get_table()?.expect("no table?");
let chain = get_chain(&table)?.expect("no chain?");
let mut ctx = { ctx.write().await };
let rules = list_rules_for_chain(&chain).unwrap().clone();
for (ip, c) in ctx.blocklist.iter_mut() {
let ip_parsed: IpAddr = ip.parse().unwrap();
let cmprule = Rule::new(&chain).unwrap().saddr(ip_parsed).drop();
let mut gexpr = RawExpression::default();
for e in cmprule.get_expressions().unwrap().iter() {
if let Some(ExpressionVariant::Cmp(_ip)) = e.get_data() {
gexpr = e.clone();
}
}
for rule in rules.iter() {
for expr in rule.get_expressions().unwrap().iter() {
if let Some(expr::ExpressionVariant::Cmp(_)) = expr.get_data() {
if gexpr == expr.clone() {
ips_all_count += 1;
c.handle = *rule.get_handle().unwrap();
}
}
}
}
}
}
if *fwlen != ips_all_count {
ret.push(format!("{length} ip in firewall", length = ips_all_count));
}
*fwlen = ips_all_count;
Ok(())
}
#[allow(dead_code)]
fn fw_rules_count() -> i64 {
0
}

234
src/ip.rs
View File

@ -1,18 +1,16 @@
use crate::utils::gethostname; use crate::config::Context;
use crate::utils::*;
use std::{
cmp::Ordering,
fmt::{Display, Formatter},
io::{BufRead, BufReader, Read},
net::IpAddr,
};
use chrono::offset::LocalResult;
use chrono::prelude::*; use chrono::prelude::*;
use ipnet::IpNet; use ipnet::IpNet;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::Regex; use regex::Regex;
use reqwest::Error as ReqError;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::io::{BufRead, BufReader, Read};
use std::net::IpAddr;
lazy_static! { lazy_static! {
static ref R_IPV4: Regex = Regex::new(include_str!("regexps/ipv4.txt")).unwrap(); static ref R_IPV4: Regex = Regex::new(include_str!("regexps/ipv4.txt")).unwrap();
@ -20,64 +18,18 @@ lazy_static! {
static ref R_DATE: Regex = Regex::new(include_str!("regexps/date.txt")).unwrap(); static ref R_DATE: Regex = Regex::new(include_str!("regexps/date.txt")).unwrap();
} }
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IpEvent {
pub msgtype: String,
pub mode: String,
pub hostname: String,
pub ipdata: Option<IpData>,
}
#[macro_export]
macro_rules! ipevent {
($msgtype:expr,$mode:expr,$hostname:expr,$ipdata:expr) => {
IpEvent {
msgtype: String::from($msgtype),
mode: String::from($mode),
hostname: $hostname,
ipdata: $ipdata,
}
};
($msgtype:expr,$mode:expr,$hostname:expr) => {
IpEvent {
msgtype: String::from($msgtype),
mode: String::from($mode),
hostname: $hostname,
ipdata: None,
}
};
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BlockIpData {
pub ipdata: IpData,
pub tryfail: i64,
pub blocktime: i64,
pub starttime: DateTime<Local>,
pub blocked: bool,
pub handle: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq)] #[derive(Clone, Debug, Serialize, Deserialize, Eq)]
pub struct IpData { pub struct IpData {
pub t: isize,
pub ip: String, pub ip: String,
pub src: String, pub src: String,
pub date: String, pub date: String,
pub hostname: String, pub hostname: String,
} }
#[macro_export] impl IpData {
macro_rules! ipdata { pub fn parse_date(&self) -> DateTime<FixedOffset> {
($t:expr,$ip:expr,$src:expr,$date:expr,$hostname:expr) => { DateTime::parse_from_rfc3339(self.date.as_str()).unwrap()
IpData { }
t: $t.clone(),
ip: $ip.clone(),
src: $src.clone(),
date: $date.clone(),
hostname: $hostname.clone(),
}
};
} }
impl PartialEq for IpData { impl PartialEq for IpData {
@ -114,38 +66,116 @@ impl Display for IpData {
} }
} }
pub async fn push_ip(ctx: &Context, ip: &IpData, ret: &mut Vec<String>) -> Result<(), ReqError> {
let result: String;
let mut data: Vec<IpData> = vec![];
data.push(IpData {
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
});
let resp = ctx
.client
.post(format!("{server}/ips", server = ctx.flags.server))
.json(&data)
.send()
.await?;
ret.push(format!("status: {status}", status = resp.status()));
let res = resp.text().await.unwrap();
if res.trim().len() > 0 {
result = res.trim().to_string();
} else {
result = "".to_string();
}
ret.push(format!("response: {result}"));
Ok(())
}
pub async fn _push_ip_bulk(
ctx: &Context,
ips: &Vec<IpData>,
ret: &mut Vec<String>,
) -> Result<(), ReqError> {
let result: String;
let mut data: Vec<IpData> = vec![];
for ip in ips {
data.push(IpData {
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
})
}
let resp = ctx
.client
.post(format!("{server}/ips", server = ctx.flags.server))
.json(&data)
.send()
.await?;
ret.push(format!("status: {status}", status = resp.status()));
let res = resp.text().await.unwrap();
if res.trim().len() > 0 {
result = res.trim().to_string();
} else {
result = "".to_string();
}
ret.push(format!("response: {result}"));
Ok(())
}
pub fn filter( pub fn filter(
reader: Box<dyn Read>, lines: Box<dyn Read>,
iplist: &mut Vec<IpData>, list: &mut Vec<IpData>,
trustnets: &Vec<IpNet>, trustnets: &Vec<IpNet>,
regex: &Regex, regex: &Regex,
src: &String, src: &String,
last: &DateTime<Local>, lastprocess: &DateTime<Local>,
) -> isize { ) -> isize {
let mut ips = 0; let mut ips = 0;
let hostname = gethostname(true); let hostname = gethostname(true);
let lines = BufReader::new(reader).lines(); for line in BufReader::new(lines).lines() {
for line in lines.into_iter() {
if let Ok(l) = line { if let Ok(l) = line {
if regex.is_match(l.as_str()) { if regex.is_match(l.as_str()) {
let s_ipaddr: String; let s_ipaddr: String;
let t: isize;
match R_IPV4.captures(l.as_str()) { match R_IPV4.captures(l.as_str()) {
Some(sv4) => { Some(sv4) => {
s_ipaddr = sv4.get(0).unwrap().as_str().to_string(); s_ipaddr = sv4.get(0).unwrap().as_str().to_string();
t = 4;
} }
None => { None => {
match R_IPV6.captures(l.as_str()) { continue;
/*match R_IPV6.captures(l.as_str()) {
Some(sv6) => { Some(sv6) => {
s_ipaddr = sv6.get(0).unwrap().as_str().to_string(); s_ipaddr = sv6.get(0).unwrap().as_str().to_string();
t = 6;
} }
None => { None => {
continue; continue;
} }
}; };*/
}
};
let s_date: DateTime<Local>;
match R_DATE.captures(l.as_str()) {
Some(sdt) => {
s_date = parse_date(sdt);
if &s_date < lastprocess {
continue;
}
}
None => {
s_date = Local::now();
} }
}; };
@ -157,21 +187,13 @@ pub fn filter(
} }
}; };
let s_date: DateTime<Local>;
match R_DATE.captures(l.as_str()) {
Some(sdt) => {
s_date = parse_date(sdt);
if &s_date < last {
continue;
}
}
None => {
s_date = Local::now();
}
};
if !is_trusted(&ipaddr, &trustnets) { if !is_trusted(&ipaddr, &trustnets) {
iplist.push(ipdata!(t, s_ipaddr, src, s_date.to_rfc3339(), hostname)); list.push(IpData {
ip: s_ipaddr,
src: src.to_owned(),
date: s_date.to_rfc3339().to_owned(),
hostname: hostname.to_owned(),
});
ips += 1; ips += 1;
}; };
} }
@ -181,24 +203,21 @@ pub fn filter(
} }
fn parse_date(input: regex::Captures) -> DateTime<Local> { fn parse_date(input: regex::Captures) -> DateTime<Local> {
let mut ymd: Vec<u32> = vec![]; let mut ymd: Vec<u64> = vec![];
let mut hms: Vec<u32> = vec![]; let mut hms: Vec<u64> = vec![];
let ymd_range = 2..5;
let hms_range = 5..8;
for cap in ymd_range { let (daterange, hourrange) = (2..5, 5..8);
ymd.push(input.get(cap).unwrap().as_str().parse::<u32>().unwrap());
for i in daterange {
ymd.push(input.get(i).unwrap().as_str().parse::<u64>().unwrap());
} }
for cap in hms_range { for i in hourrange {
hms.push(input.get(cap).unwrap().as_str().parse::<u32>().unwrap()); hms.push(input.get(i).unwrap().as_str().parse::<u64>().unwrap());
} }
let date: DateTime<Local> = let date = Local
match Local.with_ymd_and_hms(ymd[0] as i32, ymd[1], ymd[2], hms[0], hms[1], hms[2]) { .ymd(ymd[0] as i32, ymd[1] as u32, ymd[2] as u32)
LocalResult::Single(s) => s, .and_hms(hms[0] as u32, hms[1] as u32, hms[2] as u32);
LocalResult::Ambiguous(a, _b) => a,
LocalResult::None => Local::now().trunc_subsecs(0),
};
date date
} }
@ -210,3 +229,24 @@ fn is_trusted(ip: &IpAddr, trustnets: &Vec<IpNet>) -> bool {
} }
false false
} }
pub async fn _get_last(ctx: &Context) -> Result<Vec<IpData>, ReqError> {
let resp = ctx
.client
.get(format!("{server}/ips/last", server = ctx.flags.server))
.query(&[("interval", "3 hours")])
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
Ok(data)
}

View File

@ -1,326 +0,0 @@
use crate::config::{Context, GIT_VERSION};
use crate::fw::*;
use crate::ip::{filter, IpData, IpEvent};
use crate::ipevent;
use crate::monitoring::apiserver;
use crate::utils::{gethostname, read_lines, sleep_s};
use crate::webservice::send_to_ipbl_api;
use crate::websocket::{send_to_ipbl_websocket, websocketpubsub, websocketreqrep};
use std::{collections::HashMap, sync::Arc};
use chrono::prelude::*;
use chrono::prelude::{DateTime, Local};
use chrono::Duration;
use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent};
use sd_notify::*;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
const BL_CHAN_SIZE: usize = 32;
const WS_CHAN_SIZE: usize = 64;
macro_rules! log_with_systemd {
($msg:expr) => {
println!("{}", $msg);
notify(false, &[NotifyState::Status(format!("{}", $msg).as_str())]).unwrap();
};
}
pub async fn run() {
let inotify = Inotify::init(InitFlags::empty()).unwrap();
let globalctx = Context::new(&inotify).await;
let ctxarc = Arc::new(RwLock::new(globalctx));
let (batch4, _) = fwglobalinit(FwTableType::IPv4, true);
let (batch6, _) = fwglobalinit(FwTableType::IPv6, true);
batch4.send().unwrap();
batch6.send().unwrap();
let mut fwlen: usize = 0;
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
log_with_systemd!(format!("Launching {}, version {}", PKG_NAME, pkgversion));
let ctxapi = Arc::clone(&ctxarc);
apiserver(&ctxapi).await.unwrap();
// initialize sockets
let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(WS_CHAN_SIZE);
let ipeventtxarc = Arc::new(RwLock::new(ipeventtx));
// init pubsub
let ctxwsps = Arc::clone(&ctxarc);
let ipeventws = Arc::clone(&ipeventtxarc);
websocketpubsub(&ctxwsps, ipeventws).await;
let ctxwsrr = Arc::clone(&ctxarc);
let mut wssocketrr = websocketreqrep(&ctxwsrr).await;
// init file watcher
let inoarc = Arc::new(RwLock::new(inotify));
let inoclone = Arc::clone(&inoarc);
let mut blrx = watchfiles(inoclone).await;
let ctxclone = Arc::clone(&ctxarc);
let ipeventclone = Arc::clone(&ipeventtxarc);
tokio::spawn(async move {
compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await;
});
notify(false, &[NotifyState::Ready]).unwrap();
loop {
let mut ret: Vec<String> = Vec::new();
let ctxclone = Arc::clone(&ctxarc);
let reloadinterval;
{
let ctx = ctxclone.read().await;
reloadinterval = ctx.reloadinterval;
}
tokio::select! {
ipevent = ipeventrx.recv() => {
let received_ip = ipevent.unwrap();
let (toblock,server) = {
let ctx = ctxclone.read().await;
(ctx.get_blocklist_toblock(true).await,ctx.flags.server.clone())
};
if received_ip.msgtype == "bootstrap".to_string() {
for ip_to_send in toblock {
let ipe = ipevent!("init","ws",gethostname(true),Some(ip_to_send.ipdata));
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
break;
}
}
continue
}
// refresh context blocklist
let filtered_ipevent = {
ctxarc.write().await.update_blocklist(&received_ip).await
};
// send ip list to api and ws sockets
if let Some(ipevent) = filtered_ipevent {
if received_ip.msgtype != "init" {
log_with_systemd!(format!("sending {} to api and ws", ipevent.ipdata.clone().unwrap().ip));
let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata);
send_to_ipbl_api(&server.clone(), &ipe).await;
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
continue;
}
}
}
}
_val = sleep_s(reloadinterval) => {
let ipe = ipevent!("ping", "ws", gethostname(true));
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
}
}
};
let ctxclone = Arc::clone(&ctxarc);
let ipstounblock = {
let mut ctx = ctxclone.write().await;
ctx.gc_blocklist().await
};
let ipstoblock = {
let ctx = ctxclone.read().await;
ctx.get_blocklist_toblock(false).await
};
get_current_rules(&ctxarc, &mut ret, &mut fwlen)
.await
.unwrap();
for ip in ipstoblock {
match fwblock(&ip) {
Ok(ip) => {
let mut ctx = ctxclone.write().await;
if let Some(x) = ctx.blocklist.get_mut(ip) {
x.blocked = true;
}
}
Err(e) => {
println!("err: {e}, unable to push firewall rules, use super user")
}
};
}
for ip in ipstounblock {
if ip.blocked {
match fwunblock(&ip) {
Ok(_) => {}
Err(e) => {
println!("err: {e}, unable to push firewall rules, use super user")
}
};
}
}
// log lines
if ret.len() > 0 {
let result = ret.join(", ");
log_with_systemd!(format!("{result}"));
}
let ctxclone = Arc::clone(&ctxarc);
let inoclone = Arc::clone(&inoarc);
handle_cfg_reload(&ctxclone, reloadinterval, &mut last_cfg_reload, inoclone).await;
}
}
async fn handle_cfg_reload(
ctxclone: &Arc<RwLock<Context>>,
reloadinterval: u64,
last_cfg_reload: &mut DateTime<Local>,
inoarc: Arc<RwLock<Inotify>>,
) {
let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(reloadinterval as i64) {
let inotify;
loop {
inotify = match inoarc.try_read() {
Ok(o) => o,
Err(e) => {
println!("{e}");
sleep_s(1).await;
continue;
}
};
break;
}
let mut ctx = match ctxclone.try_write() {
Ok(o) => o,
Err(e) => {
println!("{e}");
return;
}
};
match ctx.load(&inotify).await {
Ok(_) => {
*last_cfg_reload = Local::now().trunc_subsecs(0);
}
Err(_) => {
println!("error reloading config");
}
}
};
}
async fn watchfiles(inoarc: Arc<RwLock<Inotify>>) -> Receiver<FileEvent> {
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);
tokio::spawn(async move {
loop {
let events = inoarc.read().await.read_events().unwrap();
for inevent in events {
let date: DateTime<Local> = Local::now().trunc_subsecs(0);
bltx.send(FileEvent { inevent, date }).await.unwrap();
}
}
});
blrx
}
async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, bool) {
let currentlen = match std::fs::metadata(&path.to_string()) {
Ok(u) => u.len(),
Err(_) => 0u64,
};
let lastlen = match w.insert(path.to_string(), currentlen) {
Some(u) => u,
None => currentlen,
};
(lastlen, lastlen != currentlen)
}
async fn compare_files_changes(
ctxarc: &Arc<RwLock<Context>>,
inrx: &mut Receiver<FileEvent>,
ipeventtx: &Arc<RwLock<Sender<IpEvent>>>,
) {
let mut tnets;
loop {
let modfiles = inrx.recv().await.unwrap();
let mut iplist: Vec<IpData> = vec![];
let sas = {
let ctx = ctxarc.read().await;
tnets = ctx.cfg.build_trustnets();
ctx.sas.clone()
};
match modfiles.inevent.name {
Some(name) => {
let filename = name.to_str().unwrap();
for (sak, sa) in sas.clone().iter_mut() {
if modfiles.inevent.wd == sa.wd {
let handle: String;
if sa.filename.as_str() == "" {
handle = format!("{}/{}", &sa.fullpath, filename);
} else if filename.starts_with(sa.filename.as_str()) {
handle = sa.fullpath.to_owned();
} else {
continue;
}
let (filesize, sizechanged) = {
let mut ctx = ctxarc.write().await;
let sa = ctx.sas.get_mut(sak).unwrap();
get_last_file_size(&mut sa.watchedfiles, &handle).await
};
if !sizechanged {
continue;
}
match read_lines(&handle, filesize) {
Some(lines) => {
filter(
lines,
&mut iplist,
&tnets,
&sa.regex,
&sa.set.src,
&modfiles.date,
);
}
None => {}
};
break;
}
}
for ip in iplist {
let ipe = ipevent!("add", "file", gethostname(true), Some(ip));
let ipetx = ipeventtx.read().await;
ipetx.send(ipe).await.unwrap();
}
}
None => {}
}
}
}
pub struct FileEvent {
pub inevent: InotifyEvent,
pub date: DateTime<Local>,
}
impl std::fmt::Debug for FileEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{ie:?}", ie = self.inevent)
}
}

180
src/ipblc/inc.rs Normal file
View File

@ -0,0 +1,180 @@
use super::*;
use chrono::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
const BL_CHAN_SIZE: usize = 32;
const ZMQ_CHAN_SIZE: usize = 64;
pub async fn process(ctx: &Arc<Mutex<Context>>) {
println!(
"Launching {} version {}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION")
);
let (ipdatatx, mut ipdatarx): (Sender<IpData>, Receiver<IpData>) = channel(ZMQ_CHAN_SIZE);
// initialize the firewall table
//firewall::init(&env!("CARGO_PKG_NAME").to_string());
// initialize zeromq sockets
let reqsocket;
let subsocket;
{
let ctxarc = Arc::clone(&ctx);
let zmqctx = ctxarc.lock().await;
reqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ)
.await
.unwrap();
subsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB)
.await
.unwrap();
}
listenpubsub(&ctx, ipdatatx.clone(), subsocket).await;
let mut blrx = watchfiles(&ctx).await;
let ctxarc = Arc::clone(&ctx);
tokio::spawn(async move {
compare_files_changes(&ctxarc, &mut blrx, &ipdatatx).await;
});
loop {
let mut ret: Vec<String> = Vec::new();
// wait for logs parse and zmq channel receive
let ip = ipdatarx.recv().await.unwrap();
// lock the context mutex
let ctxarc = Arc::clone(&ctx);
let mut ctx = ctxarc.lock().await;
// refresh context blocklist
ctx.update_blocklist(&ip).await;
ctx.gc_blocklist().await;
// send ip list to ws and zmq sockets
if ip.hostname == ctx.hostname {
send_to_ipbl_ws(&ctx, &ip, &mut ret).await;
send_to_ipbl_zmq(&reqsocket, &ip).await;
}
// apply firewall blocking
firewall::block(
&env!("CARGO_PKG_NAME").to_string(),
&ctx.get_blocklist().await,
&mut ret,
)
.unwrap();
// log lines
println!("{ret}", ret = ret.join(", "));
// reload configuration from the server
match ctx.load().await {
Ok(_) => {}
Err(err) => {
println!("error loading config: {err}");
}
}
}
}
async fn watchfiles(ctx: &Arc<Mutex<Context>>) -> Receiver<FileEvent> {
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);
let ctx = Arc::clone(ctx);
tokio::spawn(async move {
loop {
let events: Vec<InotifyEvent>;
{
let ctx = ctx.lock().await;
events = ctx.instance.read_events().unwrap();
}
for event in events {
let date: DateTime<Local> = Local::now().trunc_subsecs(0);
bltx.send(FileEvent {
ie: event,
date: date,
})
.await
.unwrap();
}
}
});
blrx
}
async fn get_last_file_size(watchedfiles: &mut HashMap<String, u64>, path: &str) -> u64 {
let currentlen = match std::fs::metadata(&path.to_string()) {
Ok(u) => u.len().clone(),
Err(_) => 0u64,
};
let lastlen = match watchedfiles.insert(path.to_string(), currentlen) {
Some(u) => u,
None => 0,
};
lastlen
}
async fn compare_files_changes(
ctx: &Arc<Mutex<Context>>,
inotifyrx: &mut Receiver<FileEvent>,
ipdatatx: &Sender<IpData>,
) {
let mut trustnets;
loop {
let modifiedfiles = inotifyrx.recv().await.unwrap();
let mut list: Vec<IpData> = vec![];
let mut ctx = ctx.lock().await;
trustnets = build_trustnets(&ctx.cfg.trustnets);
match modifiedfiles.ie.name {
Some(name) => {
let inotify_filename = name.to_str().unwrap();
for sak in &mut ctx.clone().sas.keys() {
let sa = &mut ctx.sas.get_mut(sak).unwrap();
if modifiedfiles.ie.wd == sa.wd {
let handle_filename: String;
if sa.filename.as_str() == "" {
handle_filename = format!("{}/{}", &sa.fullpath, inotify_filename);
} else if inotify_filename.starts_with(sa.filename.as_str()) {
handle_filename = sa.fullpath.to_owned();
} else {
continue;
}
let filesize =
get_last_file_size(&mut sa.watchedfiles, &handle_filename).await;
match read_lines(&handle_filename, filesize) {
Some(lines) => {
filter(
lines,
&mut list,
&trustnets,
&sa.regex,
&sa.set.t,
&modifiedfiles.date,
);
}
None => {}
};
break;
}
}
drop(ctx);
for ip in list {
ipdatatx.send(ip).await.unwrap();
}
}
None => {}
}
}
}

79
src/ipblc/mod.rs Normal file
View File

@ -0,0 +1,79 @@
pub mod inc;
use crate::config::*;
use crate::firewall;
use crate::ip::*;
use crate::utils::*;
use crate::zmqcom::*;
use chrono::prelude::{DateTime, Local};
use nix::sys::inotify::InotifyEvent;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex;
pub struct FileEvent {
pub ie: InotifyEvent,
pub date: DateTime<Local>,
}
impl std::fmt::Debug for FileEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{ie:?}", ie = self.ie)
}
}
async fn send_to_ipbl_zmq(socket: &zmq::Socket, ip: &IpData) {
let msg = format!("{value}", value = serde_json::to_string(&ip).unwrap());
socket.send(&msg, 0).unwrap();
socket.recv_string(0).unwrap().unwrap();
}
async fn send_to_ipbl_ws(ctx: &Context, ip: &IpData, ret: &mut Vec<String>) {
ret.push(format!("host: {hostname}", hostname = ctx.hostname));
loop {
match push_ip(&ctx, &ip, ret).await {
Ok(_) => {
break;
}
Err(err) => {
println!("{err}");
sleep(1);
}
};
}
}
async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
let ctx = ctx.lock().await;
let prefix = format!(
"{subscription} ",
subscription = ctx.cfg.zmq.get("pubsub").unwrap().subscription
);
socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription");
drop(ctx);
tokio::spawn(async move {
loop {
let msgs: Option<String> = match socket.recv_string(0) {
Ok(s) => match s {
Ok(ss) => Some(ss),
Err(_) => None,
},
Err(_) => None,
};
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
let tosend: IpData = serde_json::from_str(msg).unwrap();
if tosend.hostname != gethostname(true) {
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
});
}

View File

@ -1,13 +1,17 @@
mod config; mod config;
mod fw; mod firewall;
mod ip; mod ip;
mod ipblc; mod ipblc;
mod monitoring;
mod utils; mod utils;
mod webservice; mod zmqcom;
mod websocket;
use config::Context;
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main] #[tokio::main]
pub async fn main() { pub async fn main() {
ipblc::run().await; // Create a new context
let ctx = Arc::new(Mutex::new(Context::new().await));
ipblc::inc::process(&ctx).await;
} }

View File

@ -1,75 +0,0 @@
use crate::config::Context;
use std::{io, sync::Arc};
use serde_json;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let ctxarc = ctxarc.clone();
let addr: String = { ctxarc.read().await.cfg.api.parse().unwrap() };
let listener = match TcpListener::bind(addr).await {
Ok(o) => o,
Err(e) => {
println!("error: {e}");
std::process::exit(1);
}
};
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((mut socket, _addr)) => {
let mut buf = vec![0; 1024];
match socket.readable().await {
Ok(_) => {
match socket.try_read(&mut buf) {
Ok(_) => {}
Err(e) => {
println!("error: {e}");
continue;
}
};
}
Err(e) => {
println!("error: {e}");
continue;
}
}
let msg = match String::from_utf8(buf.to_vec()) {
Ok(o) => o.trim_matches(char::from(0)).trim().to_string(),
Err(_) => "".to_string(),
};
let res = format_result(&ctxarc, msg.as_str()).await;
match socket.write_all(res.as_bytes()).await {
Ok(_) => {}
Err(e) => {
println!("error: {e}");
}
}
}
Err(err) => {
println!("error: {err}");
}
}
}
});
Ok(())
}
async fn format_result(ctxarc: &Arc<RwLock<Context>>, mode: &str) -> String {
let data;
let ctx = ctxarc.read().await;
match mode {
"cfg" => data = serde_json::to_string(&ctx.cfg).unwrap(),
"blocklist" => data = serde_json::to_string(&ctx.blocklist).unwrap(),
_ => data = serde_json::to_string(&ctx.blocklist).unwrap(),
};
data
}

View File

@ -1 +1 @@
(((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*)|(((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?) ((^\s*((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*$)|(^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$))

View File

@ -1,13 +1,22 @@
use std::{boxed::Box, fs::File, io::*}; use ipnet::IpNet;
use lazy_static::lazy_static;
use nix::unistd; use nix::unistd;
use tokio::time::{sleep, Duration}; use regex::Regex;
use std::boxed::Box;
use std::fs::File;
use std::io::*;
use std::path::Path;
use std::time::Duration;
lazy_static! {
static ref R_FILE_GZIP: Regex = Regex::new(r".*\.gz.*").unwrap();
}
pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> { pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
let mut file = match File::open(filename) { let mut file = match File::open(filename) {
Ok(o) => o, Ok(f) => f,
Err(e) => { Err(err) => {
println!("error: {e}"); println!("{err}");
return None; return None;
} }
}; };
@ -16,17 +25,35 @@ pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
Some(lines) Some(lines)
} }
pub async fn sleep_s(s: u64) { pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
sleep(Duration::from_secs(s)).await; // Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
} }
#[allow(dead_code)] pub fn build_trustnets(cfgtrustnets: &Vec<String>) -> Vec<IpNet> {
pub async fn sleep_ms(m: u64) { let mut trustnets: Vec<IpNet> = vec![];
sleep(Duration::from_millis(m)).await; for trustnet in cfgtrustnets {
match trustnet.parse() {
Ok(net) => trustnets.push(net),
Err(err) => {
println!("error parsing {trustnet}, error: {err}");
}
};
}
trustnets
}
pub fn sleep(seconds: u64) {
std::thread::sleep(Duration::from_secs(seconds));
} }
pub fn gethostname(show_fqdn: bool) -> String { pub fn gethostname(show_fqdn: bool) -> String {
let hostname_cstr = unistd::gethostname().expect("Failed getting hostname"); let mut buf = [0u8; 64];
let hostname_cstr = unistd::gethostname(&mut buf).expect("Failed getting hostname");
let fqdn = hostname_cstr let fqdn = hostname_cstr
.to_str() .to_str()
.expect("Hostname wasn't valid UTF-8") .expect("Hostname wasn't valid UTF-8")
@ -37,3 +64,19 @@ pub fn gethostname(show_fqdn: bool) -> String {
} }
hostname[0].to_string() hostname[0].to_string()
} }
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}

View File

@ -1,85 +0,0 @@
use crate::config::{httpclient, Context};
use crate::ip::{IpData, IpEvent};
use crate::utils::sleep_s;
use reqwest::Client;
use reqwest::Error as ReqError;
const MAX_FAILED_API_RATE: u64 = 10;
pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
let mut try_req = 0;
let client = httpclient();
loop {
match push_ip(&client, &server, &ip.ipdata.clone().unwrap()).await {
Ok(_) => {
break;
}
Err(e) => {
println!("error: {e}");
sleep_s(1).await;
if try_req == MAX_FAILED_API_RATE {
break;
}
try_req += 1;
}
};
}
}
async fn push_ip(client: &Client, server: &str, ip: &IpData) -> Result<(), ReqError> {
let mut data: Vec<IpData> = vec![];
data.push(IpData {
t: ip.t,
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
});
client
.post(format!("{server}/ips"))
.json(&data)
.send()
.await?;
Ok(())
}
async fn _push_ip_bulk(
ctx: &Context,
ips: &Vec<IpData>,
ret: &mut Vec<String>,
) -> Result<(), ReqError> {
let result: String;
let mut data: Vec<IpData> = vec![];
for ip in ips {
data.push(IpData {
t: ip.t,
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
})
}
let resp = httpclient()
.post(format!("{server}/ips", server = ctx.flags.server))
.json(&data)
.send()
.await?;
ret.push(format!("status: {status}", status = resp.status()));
let res = resp.text().await.unwrap();
if res.trim().len() > 0 {
result = res.trim().to_string();
} else {
result = "".to_string();
}
ret.push(format!("response: {result}"));
Ok(())
}

View File

@ -1,142 +0,0 @@
use crate::config::{Context, WebSocketCfg};
use crate::ip::IpEvent;
use crate::utils::{gethostname, sleep_s};
use std::{
io::{self, Write},
net::TcpStream,
sync::Arc,
};
use serde_json::json;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tungstenite::stream::*;
use tungstenite::*;
pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event, wscfg);
{
let ctx = ctxarc.read().await;
bootstrap_event = ctx.cfg.bootstrap_event().clone();
wscfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
}
wssocketrr = websocketconnect(&wscfg, &gethostname(true)).await.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
return wssocketrr;
}
pub async fn websocketpubsub(
ctxarc: &Arc<RwLock<Context>>,
txpubsub: Arc<RwLock<Sender<IpEvent>>>,
) {
let cfg;
{
let ctx = ctxarc.read().await;
cfg = ctx.cfg.ws.get("pubsub").unwrap().clone();
}
let mut websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, &gethostname(true)).await.unwrap(),
));
tokio::spawn(async move {
loop {
let mut ws = websocket.write().await;
match ws.read() {
Ok(msg) => {
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
Ok(o) => o,
Err(e) => {
println!("error in pubsub: {e:?}");
continue;
}
};
match tosend.ipdata.clone() {
Some(o) => {
if o.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
let txps = txpubsub.read().await;
txps.send(tosend).await.unwrap();
}
}
None => {
let txps = txpubsub.read().await;
txps.send(tosend.clone()).await.unwrap();
}
}
}
Err(e) => {
println!("error in pubsub: {e:?}");
ws.close(None).unwrap();
drop(ws);
websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, &gethostname(true)).await.unwrap(),
));
}
};
}
});
}
pub async fn websocketconnect<'a>(
wscfg: &WebSocketCfg,
hostname: &String,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> {
let endpoint = &wscfg.endpoint;
print!("connecting to {} ... ", endpoint);
io::stdout().flush().unwrap();
let mut socket;
loop {
(socket, _) = match connect(endpoint) {
Ok((o, e)) => (o, e),
_ => {
println!("error connecting to {endpoint}, retrying");
sleep_s(1).await;
continue;
}
};
break;
}
println!("connected to {endpoint}");
let msg = json!({ "hostname": hostname });
socket.send(Message::Text(msg.to_string().into())).unwrap();
Ok(socket)
}
pub async fn send_to_ipbl_websocket(
ws: &mut WebSocket<MaybeTlsStream<TcpStream>>,
ip: &IpEvent,
) -> bool {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
if ws.can_write() {
match ws.send(Message::Text(msg.into())) {
Ok(_) => {}
Err(e) => {
println!("err send read: {e:?}");
return false;
}
};
} else {
println!("can't write to socket");
return false;
};
if ws.can_read() {
match ws.read() {
Ok(_) => {}
Err(e) => {
println!("err send read: {e:?}");
return false;
}
};
} else {
println!("can't read from socket");
return false;
};
true
}

15
src/zmqcom.rs Normal file
View File

@ -0,0 +1,15 @@
use crate::config::*;
use zmq;
const ZMQPROTO: &str = "tcp";
pub async fn zconnect(zmqcfg: &ZMQ, zmqtype: zmq::SocketType) -> Result<zmq::Socket, zmq::Error> {
let zctx = zmq::Context::new();
let zmqhost = &zmqcfg.hostname;
let zmqport = zmqcfg.port;
let socket = zctx.socket(zmqtype).unwrap();
let connectstring = format!("{ZMQPROTO}://{zmqhost}:{zmqport}");
socket.connect(&connectstring.as_str())?;
Ok(socket)
}

View File

@ -1,3 +1,4 @@
use ipnet::Ipv4Net;
use nftnl::{nft_set, set::Set, Batch, FinalizedBatch, ProtoFamily, Table}; use nftnl::{nft_set, set::Set, Batch, FinalizedBatch, ProtoFamily, Table};
use std::{ffi::CString, io::*, net::Ipv4Addr}; use std::{ffi::CString, io::*, net::Ipv4Addr};