Compare commits

..

No commits in common. "master" and "1.1.0" have entirely different histories.

24 changed files with 1500 additions and 2877 deletions

View File

@ -8,20 +8,13 @@ platform:
arch: amd64 arch: amd64
steps: steps:
- name: build and test - name: test and build
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev libclang-dev - apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev
- curl -o $${RUSTC_WRAPPER} https://assets.paulbsd.com/sccache_linux_${DRONE_STAGE_ARCH} - cargo build --verbose --all
- chmod +x $${RUSTC_WRAPPER} - cargo test --verbose --all
- cargo b -v
- cargo t -v
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -33,19 +26,12 @@ steps:
- tag - tag
- name: release - name: release
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev libclang-dev - apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev
- curl -o $${RUSTC_WRAPPER} https://assets.paulbsd.com/sccache_linux_${DRONE_STAGE_ARCH} - cargo build --release --verbose --all
- chmod +x $${RUSTC_WRAPPER}
- cargo b -r -v
- cd target/release - cd target/release
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc - tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -61,6 +47,9 @@ steps:
api_key: api_key:
from_secret: gitea_token from_secret: gitea_token
files: "target/release/*.tar.gz" files: "target/release/*.tar.gz"
checksum:
- sha256
- sha512
environment: environment:
PLUGIN_TITLE: "" PLUGIN_TITLE: ""
when: when:
@ -84,20 +73,13 @@ platform:
arch: arm64 arch: arm64
steps: steps:
- name: build and test - name: test and build
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev libclang-dev - apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev
- curl -o $${RUSTC_WRAPPER} https://assets.paulbsd.com/sccache_linux_${DRONE_STAGE_ARCH} - cargo build --verbose --all
- chmod +x $${RUSTC_WRAPPER} - cargo test --verbose --all
- cargo b -v
- cargo t -v
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -109,19 +91,12 @@ steps:
- tag - tag
- name: release - name: release
image: rust:1 image: rust:1
pull: always
commands: commands:
- apt-get update -y - apt-get update -y
- apt-get install -y libnftnl-dev libmnl-dev libclang-dev - apt-get install -y libzmq3-dev libnftnl-dev libmnl-dev
- curl -o $${RUSTC_WRAPPER} https://assets.paulbsd.com/sccache_linux_${DRONE_STAGE_ARCH} - cargo build --release --verbose --all
- chmod +x $${RUSTC_WRAPPER}
- cargo b -r -v
- cd target/release - cd target/release
- tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc - tar -czvf ipblc-${DRONE_TAG}-${DRONE_STAGE_OS}-${DRONE_STAGE_ARCH}.tar.gz ipblc
environment:
RUSTC_WRAPPER: /usr/bin/sccache
SCCACHE_WEBDAV_ENDPOINT: https://sccache.paulbsd.com
SCCACHE_WEBDAV_KEY_PREFIX: sccache
volumes: volumes:
- name: cargo - name: cargo
path: /usr/local/cargo/registry path: /usr/local/cargo/registry
@ -137,6 +112,9 @@ steps:
api_key: api_key:
from_secret: gitea_token from_secret: gitea_token
files: "target/release/*.tar.gz" files: "target/release/*.tar.gz"
checksum:
- sha256
- sha512
environment: environment:
PLUGIN_TITLE: "" PLUGIN_TITLE: ""
when: when:

2106
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package] [package]
name = "ipblc" name = "ipblc"
version = "1.8.1" version = "1.1.0"
edition = "2021" edition = "2021"
authors = ["PaulBSD <paul@paulbsd.com>"] authors = ["PaulBSD <paul@paulbsd.com>"]
description = "ipblc is a tool that search and send attacking ip addresses to ipbl" description = "ipblc is a tool that search and send attacking ip addresses to ipbl"
@ -10,23 +10,32 @@ repository = "https://git.paulbsd.com/paulbsd/ipblc"
[dependencies] [dependencies]
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.5", features = ["string"] } clap = { version = "4.0", features = ["string"] }
git-version = "0.3" git-version = "0.3.5"
ipnet = "2.11" ipnet = "2.7"
lazy_static = "1.5" lazy_static = "1.4"
nix = { version = "0.30", features = ["hostname", "inotify"] } mnl = "0.2"
regex = "1.11" nftnl = "0.6"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } nix = "0.26"
rustables = "0.8.6" regex = "1.7"
rustables-macros = "0.1.2" reqwest = { version = "0.11", default-features = false, features = ["json","rustls-tls"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sd-notify = { version = "0.4" } tokio = { version = "1.23", features = ["full"] }
tokio = { version = "1.45", features = ["full", "sync"] } zmq = "0.10"
tungstenite = { version = "0.26", features = ["handshake", "rustls-tls-native-roots"] }
## to optimize binary size (slow compile time) # [target.aarch64-unknown-linux-gnu.dependencies]
#[profile.release] # flate2 = { version = "1.0", features = ["cloudflare_zlib"] }
#strip = true
#lto = true # [target.aarch64-linux-android.dependencies]
#opt-level = "z" # flate2 = { version = "1.0", features = ["zlib"] }
# [target.armv7-unknown-linux-gnueabihf.dependencies]
# flate2 = { version = "1.0", features = ["zlib"] }
# [target.x86_64-unknown-linux-gnu.dependencies]
# flate2 = { version = "1.0", features = ["cloudflare_zlib"] }
[profile.release]
debug = false
opt-level = 3

View File

@ -2,5 +2,5 @@ FROM rustembedded/cross:aarch64-unknown-linux-musl
RUN dpkg --add-architecture arm64 RUN dpkg --add-architecture arm64
RUN apt-get update RUN apt-get update
RUN apt-get install -y libnftnl-dev libmnl-dev libmnl0:arm64 libnftnl7:arm64 libmnl0:amd64 libnftnl0:arm64 RUN apt-get install -y libasound2-dev:arm64 libzmq3-dev libnftnl-dev libmnl-dev libmnl0:arm64 libnftnl7:arm64 libmnl0:amd64 libnftnl0:arm64
RUN apt-get clean RUN apt-get clean

View File

@ -1,10 +0,0 @@
# Notes
### Date formats
```
nginx: 2006-01-02T15:04:05+01:00
ssh: 2006-01-02T15:04:05.000000+01:00
openvpn: 2006-01-02 15:04:05
mail: 2006-01-02T15:04:05.000000+01:00
```

View File

@ -4,8 +4,8 @@
## Summary ## Summary
ipblc is client-side intrusion prevention software working closely with ipbl ipblc is a tool that search and send attacking ip addresses to ipbl
It's pub/sub features are websockets based It's notification features are based on zeromq
## Howto ## Howto
@ -26,15 +26,14 @@ cargo build --release
### Usage ### Usage
``` ```
ipblc is a tool that search and send attacking ip addresses to ipbl USAGE:
ipblc [OPTIONS]
Usage: ipblc [OPTIONS] OPTIONS:
Options:
-s, --server <server> Sets a http server [default: https://ipbl.paulbsd.com]
-d Enable debugging -d Enable debugging
-h, --help Print help information -h, --help Print help information
-V, --version Print version information -s, --server <server> Sets a ipbl server [default: https://ipbl.paulbsd.com]
-V, --version Print version informatio
``` ```
### TODO ### TODO
@ -45,20 +44,20 @@ Options:
- ✅ Handle zeromq data transfer - ✅ Handle zeromq data transfer
- ✅ Code optimizations (WIP) - ✅ Code optimizations (WIP)
- ✅ Error handing when fetching config - ✅ Error handing when fetching config
- ✅ Local bound tcp api socket
- ✅ ZMQ -> Websocket
- ✅ Bug in RwLocks (agent often give up)
- ❌ Create memory friendly structs for ipdata
### Date formats
### Notes ```
nginx: 2006-01-02T15:04:05+01:00
See [here](NOTES.md) ssh: 2006-01-02T15:04:05.000000+01:00
openvpn: 2006-01-02 15:04:05
mail: 2006-01-02T15:04:05.000000+01:00
```
## License ## License
```text ```text
Copyright (c) 2022, 2023 PaulBSD Copyright (c) 2021, 2022 PaulBSD
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without Redistribution and use in source and binary forms, with or without

View File

@ -1,143 +0,0 @@
use rustables::*;
use rustables::{expr::*, Chain, Rule, Table};
use std::{io::*, net::*};
const TABLE_NAME: &str = "ipblc4";
const CHAIN_NAME: &str = "ipblc";
fn main() -> Result<()> {
/*let name = "blabla";
let mut batch = Batch::new();
let table = Table::new(ProtocolFamily::Ipv4).with_name(name);
batch.add(&table, MsgType::Add);
let mut chain = Chain::new(&table).with_name(name);
batch.add(&chain, MsgType::Add);
let toadd1: Ipv4Addr = "9.9.9.8".parse().unwrap();
let toadd2: Ipv4Addr = "9.9.9.1".parse().unwrap();
let mut setbuilder: SetBuilder<Ipv4Addr> = SetBuilder::new("s1", &table).unwrap();
setbuilder.add(&toadd1);
setbuilder.add(&toadd2);
let (mut set, setelem) = setbuilder.finish();
batch.add(&setelem, MsgType::Add);
//batch.add(&set, MsgType::Add);
set.family = ProtocolFamily::Ipv4;
set.id = Some(5);
set.flags = Some(0);
set.userdata = Some("test".into());
println!("{:?}", setelem);*/
let get_table = || -> Result<Option<Table>> {
let tables = list_tables().unwrap();
for table in tables {
if let Some(name) = table.get_name() {
println!("Found table {}", name);
if name == TABLE_NAME {
return Ok(Some(table));
}
}
}
Ok(None)
};
let get_chain = |table: &Table| -> Result<Option<Chain>> {
let chains = list_chains_for_table(table).unwrap();
for chain in chains {
if let Some(name) = chain.get_name() {
println!("Found chain {}", name);
if name == CHAIN_NAME {
return Ok(Some(chain));
}
}
}
Ok(None)
};
let table = get_table().unwrap().expect("no table?");
let chain = get_chain(&table).unwrap().expect("no chain?");
let ip: IpAddr = "184.73.167.217".parse().unwrap();
let cmprule = Rule::new(&chain).unwrap().saddr(ip).drop();
println!("{:?}", cmprule);
let mut gexpr = RawExpression::default();
for e in cmprule.get_expressions().unwrap().iter() {
if let Some(ExpressionVariant::Cmp(_)) = e.get_data() {
gexpr = e.clone();
}
}
let rules = list_rules_for_chain(&chain).unwrap();
for rule in rules {
let handle = rule.get_handle().unwrap();
println!("handle {}", handle);
let exprs = rule.get_expressions().unwrap();
for expr in exprs.iter() {
if let Some(ExpressionVariant::Cmp(_)) = expr.get_data() {
if expr.clone() == gexpr {
println!("{:?}", expr.get_data());
println!("test");
break;
}
}
//if expr.get_data()
//if expr.
}
}
//let mut set: Set<Ipv4Addr> = nft_set!(
// &CString::new("blabla").unwrap(),
// 32,
// &table,
// ProtoFamily::Ipv4 //ProtoFamily::Ipv4;
// //[&toadd1,&toadd2,]
//);
////println!("{:?}", set.0);
//set.add(&toadd1);
//set.add(&toadd2);
//batch.add(&set, MsgType::Add);
//let mut rule = Rule::new(&chain)
// .unwrap()
// .with_expr(
// HighLevelPayload::Network(NetworkHeaderField::IPv4(IPv4HeaderField::Saddr)).build(),
// )
// .with_expr(Lookup::new(&set).unwrap())
// .with_expr(Immediate::new_verdict(VerdictKind::Accept));
//println!("{:?}", rule);
//batch.add(&rule, rustables::MsgType::Add);
//match batch.send() {
// Ok(o) => {}
// Err(e) => {
// println!("{e}");
// }
//}
//rule.add_expr(&nft_expr!(payload ipv4 saddr));
//#[rustfmt::skip]
//rule.add_expr(&nft_expr!(lookup &set));
//rule.add_expr(&nft_expr!(ct state));
//rule.add_expr(&nft_expr!(verdict drop));
//batch.add(&rule, MsgType::Add);
//let finalized_batch = batch.finalize();
//send_and_process(&finalized_batch)?;
Ok(())
}
#[allow(dead_code)]
#[derive(Debug)]
struct Error(String);
impl<T: std::error::Error> From<T> for Error {
fn from(error: T) -> Self {
Error(error.to_string())
}
}

View File

@ -1,65 +0,0 @@
use nftnl::{nft_expr, set::Set, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table};
use std::{ffi::CString, io::*, net::Ipv4Addr};
fn main() -> std::result::Result<(), Error> {
let table_name = format!("ipblc4");
let table = Table::new(
&CString::new(format!("{table_name}")).unwrap(),
ProtoFamily::Ipv4,
);
let mut batch = Batch::new();
batch.add(&table, nftnl::MsgType::Add);
batch.add(&table, nftnl::MsgType::Del);
batch.add(&table, nftnl::MsgType::Add);
let mut chain = Chain::new(&CString::new("test").unwrap(), &table);
chain.set_hook(nftnl::Hook::In, 1);
chain.set_policy(nftnl::Policy::Accept);
batch.add(&chain, nftnl::MsgType::Add);
batch.add(&Rule::new(&chain), nftnl::MsgType::Del);
let mut rule = Rule::new(&chain);
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict accept));
batch.add(&rule, nftnl::MsgType::Add);
let finalized_batch = batch.finalize();
send_and_process(&finalized_batch)?;
Ok(())
}
fn send_and_process(batch: &FinalizedBatch) -> std::result::Result<(), Error> {
let seq: u32 = 2;
let socket = mnl::Socket::new(mnl::Bus::Netfilter)?;
socket.send_all(batch)?;
let mut buffer = vec![0; nftnl::nft_nlmsg_maxsize() as usize];
while let Some(message) = socket_recv(&socket, &mut buffer[..])? {
match mnl::cb_run(message, seq, socket.portid())? {
mnl::CbResult::Stop => {
break;
}
mnl::CbResult::Ok => (),
}
}
Ok(())
}
fn socket_recv<'a>(
socket: &mnl::Socket,
buf: &'a mut [u8],
) -> std::result::Result<Option<&'a [u8]>, Error> {
let ret = socket.recv(buf)?;
if ret > 0 {
Ok(Some(&buf[..ret]))
} else {
Ok(None)
}
}

View File

@ -1,28 +0,0 @@
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}
pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
// Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
}
pub async fn _sleep_ms(ms: u64) {
sleep(Duration::from_millis(ms)).await;
}

View File

@ -1,106 +0,0 @@
use crate::config::{Context, ZMQCfg};
use crate::ip::IpEvent;
use crate::utils::gethostname;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
const ZMQPROTO: &str = "tcp";
pub async fn zconnect<'a>(
zmqcfg: &ZMQCfg,
zmqtype: zmq::SocketType,
) -> Result<zmq::Socket, zmq::Error> {
let zctx = zmq::Context::new();
let zmqhost = &zmqcfg.hostname;
let zmqport = &zmqcfg.port;
let socket = zctx.socket(zmqtype).unwrap();
let connectstring = format!("{ZMQPROTO}://{zmqhost}:{zmqport}");
socket.connect(&connectstring.as_str())?;
Ok(socket)
}
pub async fn zmqinit(ctx: &Arc<RwLock<Context>>, ipeventtx: &Sender<IpEvent>) -> zmq::Socket {
let ctxarc = Arc::clone(&ctx);
let zmqreqsocket;
let zmqsubsocket;
{
let zmqctx = ctxarc.read().await;
zmqreqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ)
.await
.unwrap();
zmqsubsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB)
.await
.unwrap();
}
listenpubsub(&ctx, ipeventtx.clone(), zmqsubsocket).await;
return zmqreqsocket;
}
async fn listenpubsub(ctx: &Arc<RwLock<Context>>, txpubsub: Sender<IpEvent>, socket: zmq::Socket) {
let prefix;
{
let ctx = ctx.read().await;
prefix = format!(
"{sub} ",
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
);
socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription");
}
tokio::spawn(async move {
loop {
let msgs: Option<String> = match socket.recv_string(0) {
Ok(s) => match s {
Ok(ss) => Some(ss),
Err(e) => {
println!("{e:?}");
None
}
},
Err(e) => {
println!("{e:?}");
None
}
};
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
let tosend: IpEvent = serde_json::from_str(msg).unwrap();
if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
});
}
pub async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &IpEvent, _ret: &mut Vec<String>) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match reqsocket.send(&msg, 0) {
Ok(_) => {}
Err(e) => {
println!("{e:?}")
}
};
match reqsocket.recv_string(0) {
Ok(o) => match o {
Ok(_) => {}
Err(ee) => {
println!("{ee:?}")
}
},
Err(e) => {
println!("{e:?}")
}
};
}

View File

@ -1,6 +0,0 @@
#!/bin/sh
git pull
cargo b --release
sudo systemctl stop ipblc
sudo cp target/release/ipblc /usr/local/apps/ipblc/ipblc
sudo systemctl start ipblc

View File

@ -1,38 +1,35 @@
use crate::ip::{BlockIpData, IpData, IpEvent}; use crate::ip::{BlockIpData, IpData};
use crate::utils::{gethostname, sleep_s}; use crate::utils::{gethostname, sleep_s};
use std::{
collections::HashMap,
hash::{Hash, Hasher},
path::Path,
};
use chrono::prelude::*; use chrono::prelude::*;
use chrono::Duration; use chrono::Duration;
use clap::{Arg, ArgAction, ArgMatches, Command}; use clap::{Arg, ArgAction, ArgMatches, Command};
use git_version::git_version; use git_version::git_version;
use ipnet::IpNet; use ipnet::IpNet;
use nix::sys::inotify::{AddWatchFlags, Inotify, WatchDescriptor}; use nix::sys::inotify::{AddWatchFlags, InitFlags, Inotify, WatchDescriptor};
use regex::Regex; use regex::Regex;
use reqwest::{Client, Error as ReqError, Response}; use reqwest::{Client, Error as ReqError, Response};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::path::Path;
pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]); pub const GIT_VERSION: &str = git_version!();
const MASTERSERVER: &str = "ipbl.paulbsd.com"; const MASTERSERVER: &str = "ipbl.paulbsd.com";
const WSSUBSCRIPTION: &str = "ipbl"; const ZMQSUBSCRIPTION: &str = "ipbl";
const CONFIG_RETRY_INTERVAL: u64 = 2; const CONFIG_RETRY: u64 = 10;
const WEB_CLIENT_TIMEOUT: i64 = 5;
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct Context { pub struct Context {
pub blocklist: HashMap<String, BlockIpData>, pub blocklist: HashMap<String, BlockIpData>,
pub cfg: Config, pub cfg: Config,
pub client: Client,
pub discovery: Discovery, pub discovery: Discovery,
pub flags: Flags, pub flags: Flags,
pub hostname: String,
pub instance: Box<Inotify>,
pub sas: HashMap<String, SetMap>, pub sas: HashMap<String, SetMap>,
pub hashwd: HashMap<String, WatchDescriptor>, pub hashwd: HashMap<String, WatchDescriptor>,
pub reloadinterval: u64,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -40,22 +37,22 @@ pub struct SetMap {
pub filename: String, pub filename: String,
pub fullpath: String, pub fullpath: String,
pub regex: Regex, pub regex: Regex,
pub set: SetCfg, pub set: Set,
pub watchedfiles: HashMap<String, u64>, pub watchedfiles: HashMap<String, u64>,
pub wd: WatchDescriptor, pub wd: WatchDescriptor,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Flags { pub struct Flags {
#[allow(dead_code)]
pub debug: bool, pub debug: bool,
pub server: String, pub server: String,
} }
impl Context { impl Context {
pub async fn new(inotify: &Inotify) -> Self { pub async fn new<'a>() -> Self {
// Get flags // Get flags
let argp: ArgMatches = Context::argparse(); let argp: ArgMatches = Context::argparse();
//let debug: bool = argp.contains_id("debug");
let debug: bool = argp.get_one::<bool>("debug").unwrap().to_owned(); let debug: bool = argp.get_one::<bool>("debug").unwrap().to_owned();
let server: String = argp.get_one::<String>("server").unwrap().to_string(); let server: String = argp.get_one::<String>("server").unwrap().to_string();
@ -63,23 +60,43 @@ impl Context {
let mut ctx = Context { let mut ctx = Context {
cfg: Config::new(), cfg: Config::new(),
flags: Flags { debug, server }, flags: Flags { debug, server },
hostname: gethostname(true),
discovery: Discovery { discovery: Discovery {
version: "1.0".to_string(), version: "1.0".to_string(),
urls: HashMap::new(), urls: HashMap::new(),
}, },
client: Client::builder()
.user_agent(format!(
"{}/{}@{}/{}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
GIT_VERSION,
gethostname(false)
))
.build()
.unwrap(),
sas: HashMap::new(), sas: HashMap::new(),
instance: Box::new(Inotify::init(InitFlags::empty()).unwrap()),
blocklist: HashMap::new(), blocklist: HashMap::new(),
hashwd: HashMap::new(), hashwd: HashMap::new(),
reloadinterval: 5,
}; };
loop {
print!("Loading config ... "); print!("Loading config ... ");
ctx.load(&inotify).await.unwrap(); match ctx.load().await {
Ok(_) => {
break;
}
Err(err) => {
println!("error loading config: {err}, retrying in {CONFIG_RETRY} secs");
sleep_s(CONFIG_RETRY);
}
}
}
ctx ctx
} }
pub fn argparse() -> ArgMatches { pub fn argparse<'a>() -> ArgMatches {
Command::new(env!("CARGO_PKG_NAME")) Command::new(env!("CARGO_PKG_NAME"))
.version(format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION)) .version(format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION))
.author(env!("CARGO_PKG_AUTHORS")) .author(env!("CARGO_PKG_AUTHORS"))
@ -101,177 +118,113 @@ impl Context {
.get_matches() .get_matches()
} }
#[allow(dead_code)]
pub async fn discovery(&self) -> Result<Discovery, ReqError> { pub async fn discovery(&self) -> Result<Discovery, ReqError> {
let resp: Result<Response, ReqError> = httpclient() let resp: Result<Response, ReqError> = self
.client
.get(format!("{server}/discovery", server = self.flags.server)) .get(format!("{server}/discovery", server = self.flags.server))
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: Discovery = match req.json().await { let data: Discovery = match req.json().await {
Ok(o) => o, Ok(res) => res,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
Ok(data) Ok(data)
} }
pub async fn load(&mut self, inotify: &Inotify) -> Result<(), Box<dyn std::error::Error>> { pub async fn load(&mut self) -> Result<(), Box<dyn std::error::Error>> {
if cfg!(test) { if cfg!(test) {
return Ok(()); return Ok(());
} }
let mut last_in_err = false; self.discovery = self.discovery().await?;
loop { self.cfg.load(self.to_owned()).await?;
let res = self.cfg.load(&self.flags.server).await; self.create_sas().await?;
match res {
Ok(()) => {
if last_in_err {
println!("loaded config");
}
break;
}
Err(e) => {
println!("error loading config: {e}, retrying in {CONFIG_RETRY_INTERVAL}s");
last_in_err = true;
sleep_s(CONFIG_RETRY_INTERVAL).await;
}
};
}
let mut last_in_err = false;
loop {
let res = self.discovery().await;
match res {
Ok(o) => {
self.discovery = o;
if last_in_err {
println!("loaded discovery");
}
break;
}
Err(e) => {
println!("error loading disvoery: {e}, retrying in {CONFIG_RETRY_INTERVAL}s");
last_in_err = true;
sleep_s(CONFIG_RETRY_INTERVAL).await;
}
};
}
if last_in_err {
println!("creating sas");
}
self.create_sas(&inotify).await?;
if last_in_err {
println!("created sas");
}
Ok(()) Ok(())
} }
#[cfg(test)] #[cfg(test)]
pub async fn get_blocklist_pending(&self) -> Vec<BlockIpData> { pub async fn get_blocklist_pending(&self) -> Vec<IpData> {
let mut res: Vec<BlockIpData> = vec![]; let mut res: Vec<IpData> = vec![];
for (_, ipblock) in self.blocklist.iter() { for (_, v) in self.blocklist.iter() {
res.push(ipblock.clone()); res.push(v.ipdata.clone());
} }
res res
} }
pub async fn get_blocklist_toblock(&self, all: bool) -> Vec<BlockIpData> { pub async fn get_blocklist_toblock(&mut self) -> Vec<IpData> {
let mut res: Vec<BlockIpData> = vec![]; let mut res: Vec<IpData> = vec![];
for (_, ipblock) in self.blocklist.iter() { let now: DateTime<Local> = Local::now().trunc_subsecs(0);
match self.cfg.sets.get(&ipblock.ipdata.src) { for (_, block) in self.blocklist.iter_mut() {
Some(set) => { let set = self.cfg.sets.get(&block.ipdata.src.to_string()).unwrap();
if ipblock.tryfail >= set.tryfail && (!ipblock.blocked || all) { if block.tryfail >= set.tryfail {
res.push(ipblock.clone()); res.push(block.ipdata.clone());
if block.tryfail == set.tryfail {
block.starttime = DateTime::from(now);
} }
} }
None => {}
}
} }
res res
} }
pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> { pub async fn update_blocklist(&mut self, ipdata: &mut IpData) -> Option<IpData> {
match &ipevent.ipdata { match self.cfg.sets.get(&ipdata.src) {
Some(ipdata) => match self.cfg.sets.get(&ipdata.src) {
Some(set) => { Some(set) => {
if self.blocklist.contains_key(&ipdata.ip)
&& self.hostname == ipdata.hostname
&& ipdata.mode == "file".to_string()
{
let mut block = self.blocklist.get_mut(&ipdata.ip).unwrap();
block.tryfail += 1;
block.blocktime = set.blocktime;
if block.tryfail >= set.tryfail {
return Some(block.ipdata.clone());
}
} else {
let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str()) let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str())
.unwrap() .unwrap()
.with_timezone(&chrono::Local); .with_timezone(&chrono::Local);
let blocktime = set.blocktime;
let blocked = false;
let handle = u64::MIN;
if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
let block =
self.blocklist self.blocklist
.entry(ipdata.ip.to_string()) .entry(ipdata.ip.to_string())
.or_insert(BlockIpData { .or_insert(BlockIpData {
ipdata: ipdata.clone(), ipdata: ipdata.clone(),
tryfail: 0, tryfail: 100,
starttime, starttime,
blocktime, blocktime: set.blocktime,
blocked,
handle,
});
block.tryfail += 1;
block.blocktime = blocktime;
if block.tryfail >= set.tryfail {
return Some(ipevent.clone());
}
} else {
self.blocklist
.entry(ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipdata.clone(),
tryfail: set.tryfail,
starttime,
blocktime,
blocked,
handle,
}); });
} }
} }
None => {} None => {}
},
None => {}
} }
None None
} }
pub async fn gc_blocklist(&mut self) -> Vec<BlockIpData> { pub async fn gc_blocklist(&mut self) -> Vec<IpData> {
let mut removed: Vec<BlockIpData> = vec![]; let mut removed: Vec<IpData> = vec![];
let now: DateTime<Local> = Local::now().trunc_subsecs(0); let now: DateTime<Local> = Local::now().trunc_subsecs(0);
// nightly, future use // nightly, future use
//let drained: HashMap<String,IpData> = ctx.blocklist.drain_filter(|k,v| v.parse_date() < mindate) //let drained: HashMap<String,IpData> = ctx.blocklist.drain_filter(|k,v| v.parse_date() < mindate)
for (ip, blocked) in self.blocklist.clone().iter() { for (ip, blocked) in self.blocklist.clone().iter() {
/*match self.cfg.sets.get(&blocked.ipdata.src) {
Some(set) => {
let mut block = self.blocklist.get_mut(ip).unwrap();
block.blocktime = set.blocktime.clone();
}
None => {}
}*/
let mindate = now - Duration::minutes(blocked.blocktime); let mindate = now - Duration::minutes(blocked.blocktime);
if blocked.starttime < mindate { if blocked.starttime < mindate {
self.blocklist.remove(&ip.clone()).unwrap(); self.blocklist.remove(&ip.clone()).unwrap();
removed.push(blocked.clone()); removed.push(blocked.ipdata.clone());
} }
} }
removed removed
} }
pub async fn create_sas( pub async fn create_sas(&mut self) -> Result<(), Box<dyn std::error::Error>> {
&mut self,
inotify: &Inotify,
) -> Result<(), Box<dyn std::error::Error>> {
for (src, set) in self.cfg.sets.iter() { for (src, set) in self.cfg.sets.iter() {
let p = Path::new(set.path.as_str()); let p = Path::new(set.path.as_str());
if p.is_dir() { if p.is_dir() {
let wd = match self.hashwd.get(&set.path.to_string()) { let wd = match self.hashwd.get(&set.path.to_string()) {
Some(wd) => *wd, Some(wd) => *wd,
None => { None => {
let res = inotify let res = self
.instance
.add_watch(set.path.as_str(), AddWatchFlags::IN_MODIFY) .add_watch(set.path.as_str(), AddWatchFlags::IN_MODIFY)
.unwrap(); .unwrap();
self.hashwd.insert(set.path.to_string(), res); self.hashwd.insert(set.path.to_string(), res);
@ -317,11 +270,10 @@ impl Context {
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct Config { pub struct Config {
pub sets: HashMap<String, SetCfg>, pub sets: HashMap<String, Set>,
#[serde(skip_serializing)] #[serde(skip_serializing)]
pub trustnets: Vec<String>, pub trustnets: Vec<String>,
pub ws: HashMap<String, WebSocketCfg>, pub zmq: HashMap<String, ZMQ>,
pub api: String,
} }
impl Config { impl Config {
@ -329,7 +281,7 @@ impl Config {
Self { Self {
sets: HashMap::from([ sets: HashMap::from([
("smtp".to_string(), ("smtp".to_string(),
SetCfg { Set {
src: "smtp".to_string(), src: "smtp".to_string(),
filename: "mail.log".to_string(), filename: "mail.log".to_string(),
regex: "(SASL LOGIN authentication failed)".to_string(), regex: "(SASL LOGIN authentication failed)".to_string(),
@ -338,7 +290,7 @@ impl Config {
tryfail: 5, tryfail: 5,
}), }),
("ssh".to_string(), ("ssh".to_string(),
SetCfg { Set {
src: "ssh".to_string(), src: "ssh".to_string(),
filename: "auth.log".to_string(), filename: "auth.log".to_string(),
regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(), regex: "(Invalid user|BREAK|not allowed because|no matching key exchange method found)".to_string(),
@ -347,7 +299,7 @@ impl Config {
tryfail: 5, tryfail: 5,
},), },),
("http".to_string(), ("http".to_string(),
SetCfg { Set {
src: "http".to_string(), src: "http".to_string(),
filename: "".to_string(), filename: "".to_string(),
regex: "(anonymousfox.co)".to_string(), regex: "(anonymousfox.co)".to_string(),
@ -356,7 +308,7 @@ impl Config {
tryfail: 5, tryfail: 5,
},), },),
("openvpn".to_string(), ("openvpn".to_string(),
SetCfg { Set {
src: "openvpn".to_string(), src: "openvpn".to_string(),
filename: "status".to_string(), filename: "status".to_string(),
regex: "(UNDEF)".to_string(), regex: "(UNDEF)".to_string(),
@ -371,75 +323,90 @@ impl Config {
"172.16.0.0/12".to_string(), "172.16.0.0/12".to_string(),
"192.168.0.0/16".to_string(), "192.168.0.0/16".to_string(),
], ],
ws: HashMap::from([("pubsub".to_string(),WebSocketCfg{ zmq: HashMap::from([("pubsub".to_string(),ZMQ{
t: "pubsub".to_string(), t: "pubsub".to_string(),
endpoint: format!("wss://{}/wsps", MASTERSERVER.to_string()), hostname: MASTERSERVER.to_string(),
subscription: WSSUBSCRIPTION.to_string(), port: 9999,
}),("reqrep".to_string(), WebSocketCfg { subscription: ZMQSUBSCRIPTION.to_string(),
}),("reqrep".to_string(),ZMQ {
t: "reqrep".to_string(), t: "reqrep".to_string(),
endpoint: format!("wss://{}/wsrr", MASTERSERVER.to_string()), hostname: MASTERSERVER.to_string(),
subscription: WSSUBSCRIPTION.to_string(), port: 9998,
})]), subscription: String::new(),
api: String::from("127.0.0.1:8060") })])
} }
} }
pub async fn load(&mut self, server: &String) -> Result<(), ReqError> { pub async fn load(&mut self, ctx: Context) -> Result<(), ReqError> {
self.get_config(server).await?; self.get_trustnets(&ctx).await?;
self.get_sets(&ctx).await?;
self.get_zmq_config(&ctx).await?;
Ok(()) Ok(())
} }
async fn get_config(&mut self, server: &String) -> Result<(), ReqError> { async fn get_trustnets(&mut self, ctx: &Context) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = httpclient() let resp: Result<Response, ReqError> = ctx
.get(format!("{server}/config?v=2")) .client
.get(format!(
"{server}/config/trustlist",
server = ctx.flags.server
))
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: GlobalConfigV2 = match req.json::<GlobalConfigV2>().await { let data: Vec<String> = match req.json::<Vec<String>>().await {
Ok(o) => o, Ok(res) => res,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
self.trustnets = data;
Ok(())
}
for d in data.sets { async fn get_sets(&mut self, ctx: &Context) -> Result<(), ReqError> {
let resp: Result<Response, ReqError> = ctx
.client
.get(format!("{server}/config/sets", server = ctx.flags.server))
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<Set> = match req.json::<Vec<Set>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
for d in data {
self.sets.insert(d.src.clone(), d); self.sets.insert(d.src.clone(), d);
} }
self.trustnets = data.trustlists;
data.ws.into_iter().map(|x| x).for_each(|x| {
self.ws.insert(x.t.to_string(), x);
});
self.api = data
.cfg
.get(&"api".to_string())
.unwrap_or(&self.api)
.clone();
Ok(()) Ok(())
} }
pub async fn _get_last(server: &String) -> Result<Vec<IpData>, ReqError> { async fn get_zmq_config(&mut self, ctx: &Context) -> Result<(), ReqError> {
let resp = httpclient() let resp: Result<Response, ReqError> = ctx
.get(format!("{server}/ips/last")) .client
.query(&[("interval", "3 hours")]) .get(format!("{server}/config/zmq", server = ctx.flags.server))
.send() .send()
.await; .await;
let req = match resp { let req = match resp {
Ok(o) => o, Ok(re) => re,
Err(e) => return Err(e), Err(err) => return Err(err),
}; };
let data: HashMap<String, ZMQ> = match req.json::<Vec<ZMQ>>().await {
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await { Ok(res) => {
Ok(o) => o, let mut out: HashMap<String, ZMQ> = HashMap::new();
Err(e) => return Err(e), res.into_iter().map(|x| x).for_each(|x| {
out.insert(x.t.to_string(), x);
});
out
}
Err(err) => return Err(err),
}; };
self.zmq = data;
Ok(data) Ok(())
} }
pub fn build_trustnets(&self) -> Vec<IpNet> { pub fn build_trustnets(&self) -> Vec<IpNet> {
@ -447,49 +414,17 @@ impl Config {
for trustnet in &self.trustnets { for trustnet in &self.trustnets {
match trustnet.parse() { match trustnet.parse() {
Ok(net) => trustnets.push(net), Ok(net) => trustnets.push(net),
Err(e) => { Err(err) => {
println!("error parsing {trustnet}, error: {e}"); println!("error parsing {trustnet}, error: {err}");
} }
}; };
} }
trustnets trustnets
} }
pub fn bootstrap_event(&self) -> IpEvent {
IpEvent {
msgtype: String::from("bootstrap"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: None,
}
}
}
pub fn httpclient() -> Client {
let client = Client::builder()
.user_agent(format!(
"{}/{}@{}/{}",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
GIT_VERSION,
gethostname(false)
))
.timeout(Duration::seconds(WEB_CLIENT_TIMEOUT).to_std().unwrap())
.build()
.unwrap();
client
} }
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct GlobalConfigV2 { pub struct Set {
pub cfg: HashMap<String, String>,
pub sets: Vec<SetCfg>,
pub trustlists: Vec<String>,
pub ws: Vec<WebSocketCfg>,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct SetCfg {
pub src: String, pub src: String,
pub filename: String, pub filename: String,
pub regex: String, pub regex: String,
@ -499,10 +434,11 @@ pub struct SetCfg {
} }
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
pub struct WebSocketCfg { pub struct ZMQ {
#[serde(rename = "type")] #[serde(rename = "type")]
pub t: String, pub t: String,
pub endpoint: String, pub hostname: String,
pub port: i64,
pub subscription: String, pub subscription: String,
} }
@ -518,13 +454,13 @@ pub struct URL {
pub path: String, pub path: String,
} }
impl PartialEq for SetCfg { impl PartialEq for Set {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.src == other.src self.src == other.src
} }
} }
impl Hash for SetCfg { impl Hash for Set {
fn hash<H: Hasher>(&self, state: &mut H) { fn hash<H: Hasher>(&self, state: &mut H) {
self.src.hash(state); self.src.hash(state);
} }
@ -534,106 +470,66 @@ impl Hash for SetCfg {
mod test { mod test {
use super::*; use super::*;
use crate::ip::*; use crate::ip::*;
use nix::sys::inotify::InitFlags;
use Context; use Context;
pub async fn prepare_test_data() -> Context { pub async fn prepare_test_data() -> Context {
let inotify = Inotify::init(InitFlags::empty()).unwrap(); let mut ctx = Context::new().await;
let mut ctx = Context::new(&inotify).await;
let now: DateTime<Local> = Local::now().trunc_subsecs(0); let now: DateTime<Local> = Local::now().trunc_subsecs(0);
ctx.blocklist = HashMap::new(); ctx.blocklist = HashMap::new();
for _i in 0..10 { for _i in 0..10 {
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpData {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ip: "1.1.1.1".to_string(), ip: "1.1.1.1".to_string(),
hostname: "test1".to_string(), hostname: "test1".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "ssh".to_string(), src: "ssh".to_string(),
}), mode: "file".to_string(),
}) })
.await; .await;
} }
for _ in 0..10 { for _ in 0..10 {
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpData {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ip: "1.1.1.2".to_string(), ip: "1.1.1.2".to_string(),
hostname: "test2".to_string(), hostname: "test2".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), mode: "file".to_string(),
}) })
.await; .await;
} }
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpData {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ip: "1.1.1.3".to_string(), ip: "1.1.1.3".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), mode: "file".to_string(),
}) })
.await; .await;
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpData {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), mode: "file".to_string(),
}) })
.await; .await;
ctx.update_blocklist(&mut IpEvent { ctx.update_blocklist(&mut IpData {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 4,
ip: "1.1.1.4".to_string(), ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(), hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(), date: now.to_rfc3339().to_string(),
src: "http".to_string(), src: "http".to_string(),
}), mode: "file".to_string(),
})
.await;
ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
t: 6,
ip: "2a00:1450:4007:805::2003".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
}) })
.await; .await;
let ip1 = ctx.blocklist.get_mut(&"1.1.1.1".to_string()).unwrap(); let mut ip1 = ctx.blocklist.get_mut(&"1.1.1.1".to_string()).unwrap();
ip1.starttime = DateTime::from(now) - Duration::minutes(61); ip1.starttime = DateTime::from(now) - Duration::minutes(61);
let ip2 = ctx.blocklist.get_mut(&"1.1.1.2".to_string()).unwrap(); let mut ip2 = ctx.blocklist.get_mut(&"1.1.1.2".to_string()).unwrap();
ip2.starttime = DateTime::from(now) - Duration::minutes(62); ip2.starttime = DateTime::from(now) - Duration::minutes(62);
ctx ctx
} }
@ -643,14 +539,8 @@ mod test {
let ctx = prepare_test_data().await; let ctx = prepare_test_data().await;
let pending = ctx.get_blocklist_pending().await; let pending = ctx.get_blocklist_pending().await;
assert_eq!(pending.len(), 5); assert_eq!(pending.len(), 4);
let ips = [ let ips = ["1.1.1.1", "1.1.1.2", "1.1.1.3", "1.1.1.4"];
"1.1.1.1",
"1.1.1.2",
"1.1.1.3",
"1.1.1.4",
"2a00:1450:4007:805::2003",
];
for i in ips { for i in ips {
let ip = ctx let ip = ctx
.blocklist .blocklist
@ -667,8 +557,8 @@ mod test {
pub async fn test_blocklist_toblock() { pub async fn test_blocklist_toblock() {
let mut ctx = prepare_test_data().await; let mut ctx = prepare_test_data().await;
ctx.gc_blocklist().await; ctx.gc_blocklist().await;
let toblock = ctx.get_blocklist_toblock(false).await; let toblock = ctx.get_blocklist_toblock().await;
assert_eq!(toblock.len(), 3); assert_eq!(toblock.len(), 2);
} }
#[tokio::test] #[tokio::test]

283
src/fw.rs
View File

@ -1,208 +1,107 @@
use crate::{config::Context, ip::BlockIpData, ipblc::PKG_NAME}; use crate::ip::IpData;
use std::{ use nftnl::{nft_expr, Batch, Chain, FinalizedBatch, ProtoFamily, Rule, Table};
io::Error, use std::{ffi::CString, io::Error, net::Ipv4Addr};
net::{IpAddr, Ipv4Addr, Ipv6Addr},
sync::Arc,
};
use tokio::sync::RwLock; pub fn init(tablename: &String) -> (Batch, Table) {
use rustables::{expr::*, *};
pub enum FwTableType {
IPv4,
IPv6,
}
#[allow(dead_code)]
pub enum FwAction {
Add,
Delete,
}
macro_rules! initrules {
($batch:expr, $table:expr, $chain:ident, $reset:expr) => {
$chain.set_hook(Hook::new(HookClass::In, 1));
$batch.add(&$chain, MsgType::Add);
if $reset {
$batch.add(&Rule::new(&$chain).unwrap(), MsgType::Del);
}
};
}
macro_rules! makerules {
($ipdata:ident, $chain:ident, $batch:ident, $t:ty, $ip_t:ident,$action:ty) => {
let ip = $ipdata.ipdata.ip.parse::<$t>().unwrap();
Rule::new(&$chain)
.unwrap()
.saddr(ip.into())
.drop()
.add_to_batch(&mut $batch);
};
}
pub fn fwglobalinit(t: FwTableType, reset: bool) -> (Batch, Chain) {
let table_name: String;
let table: Table;
let mut chain: Chain;
match t {
FwTableType::IPv4 => {
table_name = format!("{PKG_NAME}4");
table = Table::new(ProtocolFamily::Ipv4).with_name(table_name);
chain = Chain::new(&table)
.with_policy(ChainPolicy::Accept)
.with_name(PKG_NAME);
}
FwTableType::IPv6 => {
table_name = format!("{PKG_NAME}6");
table = Table::new(ProtocolFamily::Ipv6).with_name(table_name);
chain = Chain::new(&table)
.with_policy(ChainPolicy::Accept)
.with_name(PKG_NAME);
}
}
let mut batch = Batch::new(); let mut batch = Batch::new();
batch.add(&table, MsgType::Add); let table = Table::new(
initrules!(batch, table, chain, reset); &CString::new(tablename.as_str()).unwrap(),
ProtoFamily::Ipv4,
);
(batch, chain) batch.add(&table, nftnl::MsgType::Add);
batch.add(&table, nftnl::MsgType::Del);
batch.add(&table, nftnl::MsgType::Add);
(batch, table)
} }
pub fn fwblock<'a>(ip_add: &BlockIpData) -> std::result::Result<&String, error::QueryError> { pub fn block(
let (mut batch4, chain4) = fwglobalinit(FwTableType::IPv4, false); tablename: &String,
let (mut batch6, chain6) = fwglobalinit(FwTableType::IPv6, false); ips_add: &Vec<IpData>,
match ip_add.ipdata.t {
4 => {
makerules!(ip_add, chain4, batch4, Ipv4Addr, ipv4, FwAction::Add);
match batch4.send() {
Ok(_) => {}
Err(e) => {
println!("block not ok {e} {ip_add:?}")
}
}
}
6 => {
makerules!(ip_add, chain6, batch6, Ipv6Addr, ipv6, FwAction::Add);
match batch6.send() {
Ok(_) => {}
Err(e) => {
println!("block not ok {e} {ip_add:?}")
}
}
}
_ => {}
}
Ok(&ip_add.ipdata.ip)
}
pub fn fwunblock<'a>(ip_del: &BlockIpData) -> std::result::Result<&String, error::QueryError> {
let (mut batch4, chain4) = fwglobalinit(FwTableType::IPv4, false);
let (mut batch6, chain6) = fwglobalinit(FwTableType::IPv6, false);
match ip_del.ipdata.t {
4 => {
let r = Rule::new(&chain4).unwrap().with_handle(ip_del.handle);
batch4.add(&r, MsgType::Del);
match batch4.send() {
Ok(_) => {}
Err(e) => {
println!("delete not ok {e} {ip_del:?}")
}
}
}
6 => {
let r = Rule::new(&chain6).unwrap().with_handle(ip_del.handle);
batch6.add(&r, MsgType::Del);
match batch6.send() {
Ok(_) => {}
Err(e) => {
println!("delete not ok {e} {ip_del:?}")
}
}
}
_ => {}
}
Ok(&ip_del.ipdata.ip)
}
pub async fn get_current_rules(
ctx: &Arc<RwLock<Context>>,
ret: &mut Vec<String>, ret: &mut Vec<String>,
fwlen: &mut usize, fwlen: &mut usize,
) -> Result<(), Error> { ) -> std::result::Result<(), Error> {
let mut ips_all_count = 0; // convert chain
let tables = vec![format!("{PKG_NAME}4"), format!("{PKG_NAME}6")]; let ips_add = convert(ips_add);
for table_name in tables { let (mut batch, table) = init(tablename);
let get_table = || -> Result<Option<Table>, Error> {
let tables = list_tables().unwrap(); // build chain
for table in tables { let mut chain = Chain::new(&CString::new(tablename.as_str()).unwrap(), &table);
if let Some(name) = table.get_name() { chain.set_hook(nftnl::Hook::In, 1);
if *name == table_name { chain.set_policy(nftnl::Policy::Accept);
return Ok(Some(table));
} // add chain
} batch.add(&chain, nftnl::MsgType::Add);
let rule = Rule::new(&chain);
batch.add(&rule, nftnl::MsgType::Del);
let mut rule = Rule::new(&chain);
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict accept));
batch.add(&rule, nftnl::MsgType::Add);
// build and add rules
for ip in ips_add.clone() {
let mut rule = Rule::new(&chain);
rule.add_expr(&nft_expr!(payload ipv4 saddr));
rule.add_expr(&nft_expr!(cmp == ip));
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 10u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict drop));
batch.add(&rule, nftnl::MsgType::Add);
} }
Ok(None) // validate and send batch
}; let finalized_batch = batch.finalize();
send_and_process(&finalized_batch)?;
let get_chain = |table: &Table| -> Result<Option<Chain>, Error> { if fwlen != &mut ips_add.len() {
let chains = list_chains_for_table(table).unwrap(); ret.push(format!("{length} ip in firewall", length = ips_add.len()));
for chain in chains {
if let Some(name) = chain.get_name() {
if *name == "ipblc" {
return Ok(Some(chain));
} }
} *fwlen = ips_add.len();
}
Ok(None)
};
let table = get_table()?.expect("no table?");
let chain = get_chain(&table)?.expect("no chain?");
let mut ctx = { ctx.write().await };
let rules = list_rules_for_chain(&chain).unwrap().clone();
for (ip, c) in ctx.blocklist.iter_mut() {
let ip_parsed: IpAddr = ip.parse().unwrap();
let cmprule = Rule::new(&chain).unwrap().saddr(ip_parsed).drop();
let mut gexpr = RawExpression::default();
for e in cmprule.get_expressions().unwrap().iter() {
if let Some(ExpressionVariant::Cmp(_ip)) = e.get_data() {
gexpr = e.clone();
}
}
for rule in rules.iter() {
for expr in rule.get_expressions().unwrap().iter() {
if let Some(expr::ExpressionVariant::Cmp(_)) = expr.get_data() {
if gexpr == expr.clone() {
ips_all_count += 1;
c.handle = *rule.get_handle().unwrap();
}
}
}
}
}
}
if *fwlen != ips_all_count {
ret.push(format!("{length} ip in firewall", length = ips_all_count));
}
*fwlen = ips_all_count;
Ok(()) Ok(())
} }
#[allow(dead_code)] fn send_and_process(batch: &FinalizedBatch) -> std::result::Result<(), Error> {
fn fw_rules_count() -> i64 { let seq: u32 = 2;
0 let socket = mnl::Socket::new(mnl::Bus::Netfilter)?;
socket.send_all(batch)?;
let mut buffer = vec![0; nftnl::nft_nlmsg_maxsize() as usize];
while let Some(message) = socket_recv(&socket, &mut buffer[..])? {
match mnl::cb_run(message, seq, socket.portid())? {
mnl::CbResult::Stop => {
break;
}
mnl::CbResult::Ok => (),
}
}
Ok(())
}
fn socket_recv<'a>(
socket: &mnl::Socket,
buf: &'a mut [u8],
) -> std::result::Result<Option<&'a [u8]>, Error> {
let ret = socket.recv(buf)?;
if ret > 0 {
Ok(Some(&buf[..ret]))
} else {
Ok(None)
}
}
fn convert(input: &Vec<IpData>) -> Vec<Ipv4Addr> {
let mut output: Vec<Ipv4Addr> = vec![];
for val in input {
output.push(val.ip.parse::<Ipv4Addr>().unwrap());
}
output
} }

222
src/ip.rs
View File

@ -1,18 +1,16 @@
use crate::config::Context;
use crate::utils::gethostname; use crate::utils::gethostname;
use std::{
cmp::Ordering,
fmt::{Display, Formatter},
io::{BufRead, BufReader, Read},
net::IpAddr,
};
use chrono::offset::LocalResult;
use chrono::prelude::*; use chrono::prelude::*;
use ipnet::IpNet; use ipnet::IpNet;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::Regex; use regex::Regex;
use reqwest::Error as ReqError;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::io::{BufRead, BufReader, Read};
use std::net::IpAddr;
lazy_static! { lazy_static! {
static ref R_IPV4: Regex = Regex::new(include_str!("regexps/ipv4.txt")).unwrap(); static ref R_IPV4: Regex = Regex::new(include_str!("regexps/ipv4.txt")).unwrap();
@ -20,32 +18,13 @@ lazy_static! {
static ref R_DATE: Regex = Regex::new(include_str!("regexps/date.txt")).unwrap(); static ref R_DATE: Regex = Regex::new(include_str!("regexps/date.txt")).unwrap();
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize, Eq)]
pub struct IpEvent { pub struct IpData {
pub msgtype: String, pub ip: String,
pub mode: String, pub src: String,
pub date: String,
pub hostname: String, pub hostname: String,
pub ipdata: Option<IpData>, pub mode: String,
}
#[macro_export]
macro_rules! ipevent {
($msgtype:expr,$mode:expr,$hostname:expr,$ipdata:expr) => {
IpEvent {
msgtype: String::from($msgtype),
mode: String::from($mode),
hostname: $hostname,
ipdata: $ipdata,
}
};
($msgtype:expr,$mode:expr,$hostname:expr) => {
IpEvent {
msgtype: String::from($msgtype),
mode: String::from($mode),
hostname: $hostname,
ipdata: None,
}
};
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -54,30 +33,6 @@ pub struct BlockIpData {
pub tryfail: i64, pub tryfail: i64,
pub blocktime: i64, pub blocktime: i64,
pub starttime: DateTime<Local>, pub starttime: DateTime<Local>,
pub blocked: bool,
pub handle: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq)]
pub struct IpData {
pub t: isize,
pub ip: String,
pub src: String,
pub date: String,
pub hostname: String,
}
#[macro_export]
macro_rules! ipdata {
($t:expr,$ip:expr,$src:expr,$date:expr,$hostname:expr) => {
IpData {
t: $t.clone(),
ip: $ip.clone(),
src: $src.clone(),
date: $date.clone(),
hostname: $hostname.clone(),
}
};
} }
impl PartialEq for IpData { impl PartialEq for IpData {
@ -105,18 +60,89 @@ impl Display for IpData {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!( write!(
f, f,
"ip: {ip}, src: {src}, date: {date}, hostname: {hostname}", "ip: {ip}, src: {src}, date: {date}, hostname: {hostname}, mode: {mode}",
ip = self.ip, ip = self.ip,
src = self.src, src = self.src,
date = self.date, date = self.date,
hostname = self.hostname, hostname = self.hostname,
mode = self.mode,
) )
} }
} }
pub async fn push_ip(ctx: &Context, ip: &IpData, ret: &mut Vec<String>) -> Result<(), ReqError> {
let result: String;
let mut data: Vec<IpData> = vec![];
data.push(IpData {
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
mode: "file".to_string(),
});
let resp = ctx
.client
.post(format!("{server}/ips", server = ctx.flags.server))
.json(&data)
.send()
.await?;
ret.push(format!("status: {status}", status = resp.status()));
let res = resp.text().await.unwrap();
if res.trim().len() > 0 {
result = res.trim().to_string();
} else {
result = "".to_string();
}
ret.push(format!("response: {result}"));
Ok(())
}
pub async fn _push_ip_bulk(
ctx: &Context,
ips: &Vec<IpData>,
ret: &mut Vec<String>,
) -> Result<(), ReqError> {
let result: String;
let mut data: Vec<IpData> = vec![];
for ip in ips {
data.push(IpData {
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
mode: "file".to_string(),
})
}
let resp = ctx
.client
.post(format!("{server}/ips", server = ctx.flags.server))
.json(&data)
.send()
.await?;
ret.push(format!("status: {status}", status = resp.status()));
let res = resp.text().await.unwrap();
if res.trim().len() > 0 {
result = res.trim().to_string();
} else {
result = "".to_string();
}
ret.push(format!("response: {result}"));
Ok(())
}
pub fn filter( pub fn filter(
reader: Box<dyn Read>, lines: Box<dyn Read>,
iplist: &mut Vec<IpData>, list: &mut Vec<IpData>,
trustnets: &Vec<IpNet>, trustnets: &Vec<IpNet>,
regex: &Regex, regex: &Regex,
src: &String, src: &String,
@ -124,23 +150,19 @@ pub fn filter(
) -> isize { ) -> isize {
let mut ips = 0; let mut ips = 0;
let hostname = gethostname(true); let hostname = gethostname(true);
let lines = BufReader::new(reader).lines(); for line in BufReader::new(lines).lines() {
for line in lines.into_iter() {
if let Ok(l) = line { if let Ok(l) = line {
if regex.is_match(l.as_str()) { if regex.is_match(l.as_str()) {
let s_ipaddr: String; let s_ipaddr: String;
let t: isize;
match R_IPV4.captures(l.as_str()) { match R_IPV4.captures(l.as_str()) {
Some(sv4) => { Some(sv4) => {
s_ipaddr = sv4.get(0).unwrap().as_str().to_string(); s_ipaddr = sv4.get(0).unwrap().as_str().to_string();
t = 4;
} }
None => { None => {
match R_IPV6.captures(l.as_str()) { match R_IPV6.captures(l.as_str()) {
Some(sv6) => { Some(sv6) => {
s_ipaddr = sv6.get(0).unwrap().as_str().to_string(); s_ipaddr = sv6.get(0).unwrap().as_str().to_string();
t = 6;
} }
None => { None => {
continue; continue;
@ -149,14 +171,6 @@ pub fn filter(
} }
}; };
let ipaddr: IpAddr = match s_ipaddr.parse() {
Ok(ip) => ip,
Err(err) => {
println!("unparseable IP: {err} {s_ipaddr}");
continue;
}
};
let s_date: DateTime<Local>; let s_date: DateTime<Local>;
match R_DATE.captures(l.as_str()) { match R_DATE.captures(l.as_str()) {
Some(sdt) => { Some(sdt) => {
@ -170,8 +184,22 @@ pub fn filter(
} }
}; };
let ipaddr: IpAddr = match s_ipaddr.parse() {
Ok(ip) => ip,
Err(err) => {
println!("unparseable IP: {err} {s_ipaddr}");
continue;
}
};
if !is_trusted(&ipaddr, &trustnets) { if !is_trusted(&ipaddr, &trustnets) {
iplist.push(ipdata!(t, s_ipaddr, src, s_date.to_rfc3339(), hostname)); list.push(IpData {
ip: s_ipaddr,
src: src.to_owned(),
date: s_date.to_rfc3339().to_owned(),
hostname: hostname.to_owned(),
mode: "file".to_owned(),
});
ips += 1; ips += 1;
}; };
} }
@ -181,24 +209,21 @@ pub fn filter(
} }
fn parse_date(input: regex::Captures) -> DateTime<Local> { fn parse_date(input: regex::Captures) -> DateTime<Local> {
let mut ymd: Vec<u32> = vec![]; let mut ymd: Vec<u64> = vec![];
let mut hms: Vec<u32> = vec![]; let mut hms: Vec<u64> = vec![];
let ymd_range = 2..5;
let hms_range = 5..8;
for cap in ymd_range { let (daterange, hourrange) = (2..5, 5..8);
ymd.push(input.get(cap).unwrap().as_str().parse::<u32>().unwrap());
for i in daterange {
ymd.push(input.get(i).unwrap().as_str().parse::<u64>().unwrap());
} }
for cap in hms_range { for i in hourrange {
hms.push(input.get(cap).unwrap().as_str().parse::<u32>().unwrap()); hms.push(input.get(i).unwrap().as_str().parse::<u64>().unwrap());
} }
let date: DateTime<Local> = let date = Local
match Local.with_ymd_and_hms(ymd[0] as i32, ymd[1], ymd[2], hms[0], hms[1], hms[2]) { .ymd(ymd[0] as i32, ymd[1] as u32, ymd[2] as u32)
LocalResult::Single(s) => s, .and_hms(hms[0] as u32, hms[1] as u32, hms[2] as u32);
LocalResult::Ambiguous(a, _b) => a,
LocalResult::None => Local::now().trunc_subsecs(0),
};
date date
} }
@ -210,3 +235,24 @@ fn is_trusted(ip: &IpAddr, trustnets: &Vec<IpNet>) -> bool {
} }
false false
} }
pub async fn _get_last(ctx: &Context) -> Result<Vec<IpData>, ReqError> {
let resp = ctx
.client
.get(format!("{server}/ips/last", server = ctx.flags.server))
.query(&[("interval", "3 hours")])
.send()
.await;
let req = match resp {
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
Ok(res) => res,
Err(err) => return Err(err),
};
Ok(data)
}

View File

@ -1,230 +1,135 @@
use crate::config::{Context, GIT_VERSION}; use crate::config::{Context, GIT_VERSION};
use crate::fw::*; use crate::fw::{block, init};
use crate::ip::{filter, IpData, IpEvent}; use crate::ip::{filter, push_ip, IpData};
use crate::ipevent;
use crate::monitoring::apiserver;
use crate::utils::{gethostname, read_lines, sleep_s}; use crate::utils::{gethostname, read_lines, sleep_s};
use crate::webservice::send_to_ipbl_api; use crate::zmqcom::zconnect;
use crate::websocket::{send_to_ipbl_websocket, websocketpubsub, websocketreqrep};
use std::{collections::HashMap, sync::Arc};
use chrono::prelude::*; use chrono::prelude::*;
use chrono::prelude::{DateTime, Local}; use chrono::prelude::{DateTime, Local};
use chrono::Duration; use chrono::Duration;
use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent}; use nix::sys::inotify::InotifyEvent;
use sd_notify::*; use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock; use tokio::sync::Mutex;
pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
const BL_CHAN_SIZE: usize = 32; const BL_CHAN_SIZE: usize = 32;
const WS_CHAN_SIZE: usize = 64; const ZMQ_CHAN_SIZE: usize = 64;
macro_rules! log_with_systemd {
($msg:expr) => {
println!("{}", $msg);
notify(false, &[NotifyState::Status(format!("{}", $msg).as_str())]).unwrap();
};
}
pub async fn run() { pub async fn run() {
let inotify = Inotify::init(InitFlags::empty()).unwrap(); let ctx = Arc::new(Mutex::new(Context::new().await));
let globalctx = Context::new(&inotify).await; println!(
let ctxarc = Arc::new(RwLock::new(globalctx)); "Launching {}, version {}",
let (batch4, _) = fwglobalinit(FwTableType::IPv4, true); env!("CARGO_PKG_NAME"),
let (batch6, _) = fwglobalinit(FwTableType::IPv6, true); format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION)
batch4.send().unwrap(); );
batch6.send().unwrap();
let (ipdatatx, mut ipdatarx): (Sender<IpData>, Receiver<IpData>) = channel(ZMQ_CHAN_SIZE);
// initialize the firewall table
init(&env!("CARGO_PKG_NAME").to_string());
let mut fwlen: usize = 0; let mut fwlen: usize = 0;
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); // initialize zeromq sockets
let reqsocket;
let subsocket;
{
let ctxarc = Arc::clone(&ctx);
let zmqctx = ctxarc.lock().await;
reqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ)
.await
.unwrap();
subsocket = zconnect(&zmqctx.cfg.zmq.get("pubsub").unwrap(), zmq::SUB)
.await
.unwrap();
}
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0); listenpubsub(&ctx, ipdatatx.clone(), subsocket).await;
log_with_systemd!(format!("Launching {}, version {}", PKG_NAME, pkgversion));
let ctxapi = Arc::clone(&ctxarc); let mut blrx = watchfiles(&ctx).await;
apiserver(&ctxapi).await.unwrap();
// initialize sockets let ctxarc = Arc::clone(&ctx);
let (ipeventtx, mut ipeventrx): (Sender<IpEvent>, Receiver<IpEvent>) = channel(WS_CHAN_SIZE);
let ipeventtxarc = Arc::new(RwLock::new(ipeventtx));
// init pubsub
let ctxwsps = Arc::clone(&ctxarc);
let ipeventws = Arc::clone(&ipeventtxarc);
websocketpubsub(&ctxwsps, ipeventws).await;
let ctxwsrr = Arc::clone(&ctxarc);
let mut wssocketrr = websocketreqrep(&ctxwsrr).await;
// init file watcher
let inoarc = Arc::new(RwLock::new(inotify));
let inoclone = Arc::clone(&inoarc);
let mut blrx = watchfiles(inoclone).await;
let ctxclone = Arc::clone(&ctxarc);
let ipeventclone = Arc::clone(&ipeventtxarc);
tokio::spawn(async move { tokio::spawn(async move {
compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await; compare_files_changes(&ctxarc, &mut blrx, &ipdatatx).await;
}); });
notify(false, &[NotifyState::Ready]).unwrap(); let mut ip_init = IpData {
ip: "".to_string(),
src: "".to_string(),
date: "".to_string(),
hostname: "".to_string(),
mode: "init".to_string(),
};
send_to_ipbl_zmq(&reqsocket, &mut ip_init).await;
loop { loop {
let mut ret: Vec<String> = Vec::new(); let mut ret: Vec<String> = Vec::new();
let ctxclone = Arc::clone(&ctxarc); let begin: DateTime<Local> = Local::now().trunc_subsecs(0);
let reloadinterval; // wait for logs parse and zmq channel receive
{ let mut received_ip = ipdatarx.recv().await.unwrap();
let ctx = ctxclone.read().await;
reloadinterval = ctx.reloadinterval; // lock the context mutex
let ctxarc = Arc::clone(&ctx);
let mut ctx = ctxarc.lock().await;
if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() {
for ip_to_send in &mut ctx.get_blocklist_toblock().await {
ip_to_send.mode = "init".to_string();
send_to_ipbl_zmq(&reqsocket, ip_to_send).await;
} }
continue;
tokio::select! {
ipevent = ipeventrx.recv() => {
let received_ip = ipevent.unwrap();
let (toblock,server) = {
let ctx = ctxclone.read().await;
(ctx.get_blocklist_toblock(true).await,ctx.flags.server.clone())
};
if received_ip.msgtype == "bootstrap".to_string() {
for ip_to_send in toblock {
let ipe = ipevent!("init","ws",gethostname(true),Some(ip_to_send.ipdata));
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
break;
}
}
continue
} }
// refresh context blocklist // refresh context blocklist
let filtered_ipevent = { let filtered_ip = ctx.update_blocklist(&mut received_ip).await;
ctxarc.write().await.update_blocklist(&received_ip).await ctx.gc_blocklist().await;
};
// send ip list to api and ws sockets // send ip list to ws and zmq sockets
if let Some(ipevent) = filtered_ipevent { if let Some(mut ip) = filtered_ip {
if received_ip.msgtype != "init" { send_to_ipbl_ws(&ctx, &mut ip, &mut ret).await;
log_with_systemd!(format!("sending {} to api and ws", ipevent.ipdata.clone().unwrap().ip)); send_to_ipbl_zmq(&reqsocket, &mut ip).await;
let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata);
send_to_ipbl_api(&server.clone(), &ipe).await;
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
continue;
} }
}
}
}
_val = sleep_s(reloadinterval) => {
let ipe = ipevent!("ping", "ws", gethostname(true));
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
}
}
};
let ctxclone = Arc::clone(&ctxarc); // apply firewall blocking
let ipstounblock = { block(
let mut ctx = ctxclone.write().await; &env!("CARGO_PKG_NAME").to_string(),
ctx.gc_blocklist().await &ctx.get_blocklist_toblock().await,
}; &mut ret,
let ipstoblock = { &mut fwlen,
let ctx = ctxclone.read().await; )
ctx.get_blocklist_toblock(false).await
};
get_current_rules(&ctxarc, &mut ret, &mut fwlen)
.await
.unwrap(); .unwrap();
for ip in ipstoblock {
match fwblock(&ip) {
Ok(ip) => {
let mut ctx = ctxclone.write().await;
if let Some(x) = ctx.blocklist.get_mut(ip) {
x.blocked = true;
}
}
Err(e) => {
println!("err: {e}, unable to push firewall rules, use super user")
}
};
}
for ip in ipstounblock {
if ip.blocked {
match fwunblock(&ip) {
Ok(_) => {}
Err(e) => {
println!("err: {e}, unable to push firewall rules, use super user")
}
};
}
}
// log lines // log lines
if ret.len() > 0 { if ret.len() > 0 {
let result = ret.join(", "); println!("{ret}", ret = ret.join(", "));
log_with_systemd!(format!("{result}"));
} }
let ctxclone = Arc::clone(&ctxarc); let end: DateTime<Local> = Local::now().trunc_subsecs(0);
let inoclone = Arc::clone(&inoarc); if (end - begin) > Duration::seconds(5) {
handle_cfg_reload(&ctxclone, reloadinterval, &mut last_cfg_reload, inoclone).await; // reload configuration from the server
match ctx.load().await {
Ok(_) => {}
Err(err) => {
println!("error loading config: {err}");
}
}
}
} }
} }
async fn handle_cfg_reload( async fn watchfiles(ctx: &Arc<Mutex<Context>>) -> Receiver<FileEvent> {
ctxclone: &Arc<RwLock<Context>>,
reloadinterval: u64,
last_cfg_reload: &mut DateTime<Local>,
inoarc: Arc<RwLock<Inotify>>,
) {
let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(reloadinterval as i64) {
let inotify;
loop {
inotify = match inoarc.try_read() {
Ok(o) => o,
Err(e) => {
println!("{e}");
sleep_s(1).await;
continue;
}
};
break;
}
let mut ctx = match ctxclone.try_write() {
Ok(o) => o,
Err(e) => {
println!("{e}");
return;
}
};
match ctx.load(&inotify).await {
Ok(_) => {
*last_cfg_reload = Local::now().trunc_subsecs(0);
}
Err(_) => {
println!("error reloading config");
}
}
};
}
async fn watchfiles(inoarc: Arc<RwLock<Inotify>>) -> Receiver<FileEvent> {
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE); let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);
let ctx = Arc::clone(ctx);
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
let events = inoarc.read().await.read_events().unwrap(); let events: Vec<InotifyEvent>;
{
let c = ctx.lock().await;
let instance = c.instance.clone();
drop(c);
events = instance.read_events().unwrap();
}
for inevent in events { for inevent in events {
let date: DateTime<Local> = Local::now().trunc_subsecs(0); let date: DateTime<Local> = Local::now().trunc_subsecs(0);
@ -237,36 +142,34 @@ async fn watchfiles(inoarc: Arc<RwLock<Inotify>>) -> Receiver<FileEvent> {
async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, bool) { async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, bool) {
let currentlen = match std::fs::metadata(&path.to_string()) { let currentlen = match std::fs::metadata(&path.to_string()) {
Ok(u) => u.len(), Ok(u) => u.len().clone(),
Err(_) => 0u64, Err(_) => 0u64,
}; };
let lastlen = match w.insert(path.to_string(), currentlen) { let lastlen = match w.insert(path.to_string(), currentlen) {
Some(u) => u, Some(u) => u,
None => currentlen, None => 0u64,
}; };
(lastlen, lastlen != currentlen) (lastlen, lastlen != currentlen)
} }
async fn compare_files_changes( async fn compare_files_changes(
ctxarc: &Arc<RwLock<Context>>, ctx: &Arc<Mutex<Context>>,
inrx: &mut Receiver<FileEvent>, inrx: &mut Receiver<FileEvent>,
ipeventtx: &Arc<RwLock<Sender<IpEvent>>>, ipdatatx: &Sender<IpData>,
) { ) {
let mut tnets; let mut tnets;
loop { loop {
let modfiles = inrx.recv().await.unwrap(); let modfiles = inrx.recv().await.unwrap();
let mut iplist: Vec<IpData> = vec![]; let mut iplist: Vec<IpData> = vec![];
let sas = { let mut ctx = ctx.lock().await;
let ctx = ctxarc.read().await;
tnets = ctx.cfg.build_trustnets(); tnets = ctx.cfg.build_trustnets();
ctx.sas.clone()
};
match modfiles.inevent.name { match modfiles.inevent.name {
Some(name) => { Some(name) => {
let filename = name.to_str().unwrap(); let filename = name.to_str().unwrap();
for (sak, sa) in sas.clone().iter_mut() { for sak in &mut ctx.clone().sas.keys() {
let sa = &mut ctx.sas.get_mut(sak).unwrap();
if modfiles.inevent.wd == sa.wd { if modfiles.inevent.wd == sa.wd {
let handle: String; let handle: String;
if sa.filename.as_str() == "" { if sa.filename.as_str() == "" {
@ -277,11 +180,8 @@ async fn compare_files_changes(
continue; continue;
} }
let (filesize, sizechanged) = { let (filesize, sizechanged) =
let mut ctx = ctxarc.write().await; get_last_file_size(&mut sa.watchedfiles, &handle).await;
let sa = ctx.sas.get_mut(sak).unwrap();
get_last_file_size(&mut sa.watchedfiles, &handle).await
};
if !sizechanged { if !sizechanged {
continue; continue;
@ -304,9 +204,7 @@ async fn compare_files_changes(
} }
} }
for ip in iplist { for ip in iplist {
let ipe = ipevent!("add", "file", gethostname(true), Some(ip)); ipdatatx.send(ip).await.unwrap();
let ipetx = ipeventtx.read().await;
ipetx.send(ipe).await.unwrap();
} }
} }
None => {} None => {}
@ -324,3 +222,79 @@ impl std::fmt::Debug for FileEvent {
write!(f, "{ie:?}", ie = self.inevent) write!(f, "{ie:?}", ie = self.inevent)
} }
} }
async fn send_to_ipbl_zmq(reqsocket: &zmq::Socket, ip: &mut IpData) {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
match reqsocket.send(&msg, 0) {
Ok(_) => {}
Err(e) => {
println!("{e:?}")
}
};
match reqsocket.recv_string(0) {
Ok(o) => match o {
Ok(_) => {}
Err(ee) => {
println!("{ee:?}")
}
},
Err(e) => {
println!("{e:?}")
}
};
}
async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec<String>) {
ret.push(format!("host: {hostname}", hostname = ctx.hostname));
loop {
match push_ip(&ctx, &ip, ret).await {
Ok(_) => {
break;
}
Err(err) => {
println!("{err}");
sleep_s(1);
}
};
}
}
async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
let ctx = ctx.lock().await;
let prefix = format!(
"{sub} ",
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
);
socket
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
.expect("failed setting subscription");
drop(ctx);
tokio::spawn(async move {
loop {
let msgs: Option<String> = match socket.recv_string(0) {
Ok(s) => match s {
Ok(ss) => Some(ss),
Err(e) => {
println!("{e:?}");
None
}
},
Err(e) => {
println!("{e:?}");
None
}
};
match msgs {
Some(ss) => {
let msg = ss.strip_prefix(prefix.as_str()).unwrap();
let tosend: IpData = serde_json::from_str(msg).unwrap();
if tosend.hostname != gethostname(true) || tosend.mode == "init".to_string() {
txpubsub.send(tosend).await.unwrap();
}
}
None => {}
};
}
});
}

View File

@ -2,12 +2,11 @@ mod config;
mod fw; mod fw;
mod ip; mod ip;
mod ipblc; mod ipblc;
mod monitoring;
mod utils; mod utils;
mod webservice; mod zmqcom;
mod websocket;
#[tokio::main] #[tokio::main]
pub async fn main() { pub async fn main() {
// Create a new context
ipblc::run().await; ipblc::run().await;
} }

View File

@ -1,75 +0,0 @@
use crate::config::Context;
use std::{io, sync::Arc};
use serde_json;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let ctxarc = ctxarc.clone();
let addr: String = { ctxarc.read().await.cfg.api.parse().unwrap() };
let listener = match TcpListener::bind(addr).await {
Ok(o) => o,
Err(e) => {
println!("error: {e}");
std::process::exit(1);
}
};
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((mut socket, _addr)) => {
let mut buf = vec![0; 1024];
match socket.readable().await {
Ok(_) => {
match socket.try_read(&mut buf) {
Ok(_) => {}
Err(e) => {
println!("error: {e}");
continue;
}
};
}
Err(e) => {
println!("error: {e}");
continue;
}
}
let msg = match String::from_utf8(buf.to_vec()) {
Ok(o) => o.trim_matches(char::from(0)).trim().to_string(),
Err(_) => "".to_string(),
};
let res = format_result(&ctxarc, msg.as_str()).await;
match socket.write_all(res.as_bytes()).await {
Ok(_) => {}
Err(e) => {
println!("error: {e}");
}
}
}
Err(err) => {
println!("error: {err}");
}
}
}
});
Ok(())
}
async fn format_result(ctxarc: &Arc<RwLock<Context>>, mode: &str) -> String {
let data;
let ctx = ctxarc.read().await;
match mode {
"cfg" => data = serde_json::to_string(&ctx.cfg).unwrap(),
"blocklist" => data = serde_json::to_string(&ctx.blocklist).unwrap(),
_ => data = serde_json::to_string(&ctx.blocklist).unwrap(),
};
data
}

View File

@ -1 +1 @@
(((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*)|(((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?) ((^\s*((([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))\s*$)|(^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$))

View File

@ -1,13 +1,21 @@
use std::{boxed::Box, fs::File, io::*}; use lazy_static::lazy_static;
use nix::unistd; use nix::unistd;
use tokio::time::{sleep, Duration}; use regex::Regex;
use std::boxed::Box;
use std::fs::File;
use std::io::*;
use std::path::Path;
use std::time::Duration;
lazy_static! {
static ref R_FILE_GZIP: Regex = Regex::new(r".*\.gz.*").unwrap();
}
pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> { pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
let mut file = match File::open(filename) { let mut file = match File::open(filename) {
Ok(o) => o, Ok(f) => f,
Err(e) => { Err(err) => {
println!("error: {e}"); println!("{err}");
return None; return None;
} }
}; };
@ -16,13 +24,21 @@ pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
Some(lines) Some(lines)
} }
pub async fn sleep_s(s: u64) { pub fn _dedup<T: Ord + PartialOrd>(list: &mut Vec<T>) -> usize {
sleep(Duration::from_secs(s)).await; // Begin with sorting entries
list.sort();
// Then deduplicate
list.dedup();
// Return the length
list.len()
} }
#[allow(dead_code)] pub fn _sleep_ms(ms: u64) {
pub async fn sleep_ms(m: u64) { std::thread::sleep(Duration::from_millis(ms));
sleep(Duration::from_millis(m)).await; }
pub fn sleep_s(s: u64) {
std::thread::sleep(Duration::from_secs(s));
} }
pub fn gethostname(show_fqdn: bool) -> String { pub fn gethostname(show_fqdn: bool) -> String {
@ -37,3 +53,19 @@ pub fn gethostname(show_fqdn: bool) -> String {
} }
hostname[0].to_string() hostname[0].to_string()
} }
pub fn _search_subfolders(path: &Path) -> Vec<String> {
let dirs = std::fs::read_dir(path).unwrap();
let mut folders: Vec<String> = vec![];
for dir in dirs {
let dirpath = dir.unwrap().path();
let path = Path::new(dirpath.as_path());
if path.is_dir() {
folders.push(dirpath.to_str().unwrap().to_string());
for f in _search_subfolders(path) {
folders.push(f);
}
}
}
folders
}

View File

@ -1,85 +0,0 @@
use crate::config::{httpclient, Context};
use crate::ip::{IpData, IpEvent};
use crate::utils::sleep_s;
use reqwest::Client;
use reqwest::Error as ReqError;
const MAX_FAILED_API_RATE: u64 = 10;
pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
let mut try_req = 0;
let client = httpclient();
loop {
match push_ip(&client, &server, &ip.ipdata.clone().unwrap()).await {
Ok(_) => {
break;
}
Err(e) => {
println!("error: {e}");
sleep_s(1).await;
if try_req == MAX_FAILED_API_RATE {
break;
}
try_req += 1;
}
};
}
}
async fn push_ip(client: &Client, server: &str, ip: &IpData) -> Result<(), ReqError> {
let mut data: Vec<IpData> = vec![];
data.push(IpData {
t: ip.t,
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
});
client
.post(format!("{server}/ips"))
.json(&data)
.send()
.await?;
Ok(())
}
async fn _push_ip_bulk(
ctx: &Context,
ips: &Vec<IpData>,
ret: &mut Vec<String>,
) -> Result<(), ReqError> {
let result: String;
let mut data: Vec<IpData> = vec![];
for ip in ips {
data.push(IpData {
t: ip.t,
ip: ip.ip.to_string(),
src: ip.src.to_string(),
date: ip.date.to_string(),
hostname: ip.hostname.to_string(),
})
}
let resp = httpclient()
.post(format!("{server}/ips", server = ctx.flags.server))
.json(&data)
.send()
.await?;
ret.push(format!("status: {status}", status = resp.status()));
let res = resp.text().await.unwrap();
if res.trim().len() > 0 {
result = res.trim().to_string();
} else {
result = "".to_string();
}
ret.push(format!("response: {result}"));
Ok(())
}

View File

@ -1,142 +0,0 @@
use crate::config::{Context, WebSocketCfg};
use crate::ip::IpEvent;
use crate::utils::{gethostname, sleep_s};
use std::{
io::{self, Write},
net::TcpStream,
sync::Arc,
};
use serde_json::json;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tungstenite::stream::*;
use tungstenite::*;
pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event, wscfg);
{
let ctx = ctxarc.read().await;
bootstrap_event = ctx.cfg.bootstrap_event().clone();
wscfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
}
wssocketrr = websocketconnect(&wscfg, &gethostname(true)).await.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
return wssocketrr;
}
pub async fn websocketpubsub(
ctxarc: &Arc<RwLock<Context>>,
txpubsub: Arc<RwLock<Sender<IpEvent>>>,
) {
let cfg;
{
let ctx = ctxarc.read().await;
cfg = ctx.cfg.ws.get("pubsub").unwrap().clone();
}
let mut websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, &gethostname(true)).await.unwrap(),
));
tokio::spawn(async move {
loop {
let mut ws = websocket.write().await;
match ws.read() {
Ok(msg) => {
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
Ok(o) => o,
Err(e) => {
println!("error in pubsub: {e:?}");
continue;
}
};
match tosend.ipdata.clone() {
Some(o) => {
if o.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
let txps = txpubsub.read().await;
txps.send(tosend).await.unwrap();
}
}
None => {
let txps = txpubsub.read().await;
txps.send(tosend.clone()).await.unwrap();
}
}
}
Err(e) => {
println!("error in pubsub: {e:?}");
ws.close(None).unwrap();
drop(ws);
websocket = Arc::new(RwLock::new(
websocketconnect(&cfg, &gethostname(true)).await.unwrap(),
));
}
};
}
});
}
pub async fn websocketconnect<'a>(
wscfg: &WebSocketCfg,
hostname: &String,
) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, Error> {
let endpoint = &wscfg.endpoint;
print!("connecting to {} ... ", endpoint);
io::stdout().flush().unwrap();
let mut socket;
loop {
(socket, _) = match connect(endpoint) {
Ok((o, e)) => (o, e),
_ => {
println!("error connecting to {endpoint}, retrying");
sleep_s(1).await;
continue;
}
};
break;
}
println!("connected to {endpoint}");
let msg = json!({ "hostname": hostname });
socket.send(Message::Text(msg.to_string().into())).unwrap();
Ok(socket)
}
pub async fn send_to_ipbl_websocket(
ws: &mut WebSocket<MaybeTlsStream<TcpStream>>,
ip: &IpEvent,
) -> bool {
let msg = format!("{val}", val = serde_json::to_string(&ip).unwrap());
if ws.can_write() {
match ws.send(Message::Text(msg.into())) {
Ok(_) => {}
Err(e) => {
println!("err send read: {e:?}");
return false;
}
};
} else {
println!("can't write to socket");
return false;
};
if ws.can_read() {
match ws.read() {
Ok(_) => {}
Err(e) => {
println!("err send read: {e:?}");
return false;
}
};
} else {
println!("can't read from socket");
return false;
};
true
}

13
src/zmqcom.rs Normal file
View File

@ -0,0 +1,13 @@
use crate::config::ZMQ;
const ZMQPROTO: &str = "tcp";
pub async fn zconnect(zmqcfg: &ZMQ, zmqtype: zmq::SocketType) -> Result<zmq::Socket, zmq::Error> {
let zctx = zmq::Context::new();
let zmqhost = &zmqcfg.hostname;
let zmqport = &zmqcfg.port;
let socket = zctx.socket(zmqtype).unwrap();
let connectstring = format!("{ZMQPROTO}://{zmqhost}:{zmqport}");
socket.connect(&connectstring.as_str())?;
Ok(socket)
}