Compare commits

..

No commits in common. "master" and "1.6.0" have entirely different histories.

10 changed files with 494 additions and 732 deletions

827
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package]
name = "ipblc"
version = "1.7.0"
version = "1.6.0"
edition = "2021"
authors = ["PaulBSD <paul@paulbsd.com>"]
description = "ipblc is a tool that search and send attacking ip addresses to ipbl"
@ -21,9 +21,8 @@ regex = "1.10"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sd-notify = { version = "0.4" }
tokio = { version = "1.35", features = ["full", "sync"] }
tungstenite = { version = "0.21", features = ["handshake", "rustls-tls-native-roots"] }
tokio = { version = "1.34", features = ["full", "sync"] }
tungstenite = { version = "0.20", features = ["handshake", "rustls-tls-native-roots"] }
## to optimize binary size (slow compile time)
#[profile.release]

View File

@ -17,7 +17,7 @@ use std::path::Path;
pub const GIT_VERSION: &str = git_version!(args = ["--always", "--dirty="]);
const MASTERSERVER: &str = "ipbl.paulbsd.com";
const WSSUBSCRIPTION: &str = "ipbl";
const CONFIG_RETRY_INTERVAL: u64 = 2;
const CONFIG_RETRY: u64 = 2;
const WEB_CLIENT_TIMEOUT: i64 = 5;
#[derive(Debug)]
@ -103,12 +103,12 @@ impl Context {
.send()
.await;
let req = match resp {
Ok(o) => o,
Err(e) => return Err(e),
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Discovery = match req.json().await {
Ok(o) => o,
Err(e) => return Err(e),
Ok(res) => res,
Err(err) => return Err(err),
};
Ok(data)
}
@ -127,10 +127,10 @@ impl Context {
}
break;
}
Err(e) => {
println!("error loading config: {e}, retrying in {CONFIG_RETRY_INTERVAL}s");
Err(err) => {
println!("error loading config: {err}, retrying in {CONFIG_RETRY}s");
last_in_err = true;
sleep_s(CONFIG_RETRY_INTERVAL).await;
sleep_s(CONFIG_RETRY).await;
}
};
}
@ -169,41 +169,38 @@ impl Context {
}
pub async fn update_blocklist(&mut self, ipevent: &IpEvent) -> Option<IpEvent> {
match &ipevent.ipdata {
Some(ipdata) => match self.cfg.sets.get(&ipdata.src) {
Some(set) => {
let starttime = DateTime::parse_from_rfc3339(ipdata.date.as_str())
.unwrap()
.with_timezone(&chrono::Local);
let blocktime = set.blocktime;
if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
let block =
self.blocklist
.entry(ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipdata.clone(),
tryfail: 0,
starttime,
blocktime,
});
block.tryfail += 1;
block.blocktime = blocktime;
if block.tryfail >= set.tryfail {
return Some(ipevent.clone());
}
} else {
self.blocklist
.entry(ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipdata.clone(),
tryfail: set.tryfail,
starttime,
blocktime,
});
match self.cfg.sets.get(&ipevent.ipdata.src) {
Some(set) => {
let starttime = DateTime::parse_from_rfc3339(ipevent.ipdata.date.as_str())
.unwrap()
.with_timezone(&chrono::Local);
let blocktime = set.blocktime;
if ipevent.mode == "file".to_string() && gethostname(true) == ipevent.hostname {
let block = self
.blocklist
.entry(ipevent.ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipevent.ipdata.clone(),
tryfail: 0,
starttime,
blocktime,
});
block.tryfail += 1;
block.blocktime = blocktime;
if block.tryfail >= set.tryfail {
return Some(ipevent.clone());
}
} else {
self.blocklist
.entry(ipevent.ipdata.ip.to_string())
.or_insert(BlockIpData {
ipdata: ipevent.ipdata.clone(),
tryfail: set.tryfail,
starttime,
blocktime,
});
}
None => {}
},
}
None => {}
}
None
@ -365,12 +362,12 @@ impl Config {
.send()
.await;
let req = match resp {
Ok(o) => o,
Err(e) => return Err(e),
Ok(re) => re,
Err(err) => return Err(err),
};
let data: GlobalConfigV2 = match req.json::<GlobalConfigV2>().await {
Ok(o) => o,
Err(e) => return Err(e),
Ok(res) => res,
Err(err) => return Err(err),
};
for d in data.sets {
@ -400,13 +397,13 @@ impl Config {
.await;
let req = match resp {
Ok(o) => o,
Err(e) => return Err(e),
Ok(re) => re,
Err(err) => return Err(err),
};
let data: Vec<IpData> = match req.json::<Vec<IpData>>().await {
Ok(o) => o,
Err(e) => return Err(e),
Ok(res) => res,
Err(err) => return Err(err),
};
Ok(data)
@ -417,8 +414,8 @@ impl Config {
for trustnet in &self.trustnets {
match trustnet.parse() {
Ok(net) => trustnets.push(net),
Err(e) => {
println!("error parsing {trustnet}, error: {e}");
Err(err) => {
println!("error parsing {trustnet}, error: {err}");
}
};
}
@ -430,7 +427,13 @@ impl Config {
msgtype: String::from("bootstrap"),
mode: String::from("ws"),
hostname: gethostname(true),
ipdata: None,
ipdata: IpData {
t: 4,
ip: "".to_string(),
src: "".to_string(),
date: "".to_string(),
hostname: "".to_string(),
},
}
}
}
@ -518,13 +521,13 @@ mod test {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
ipdata: IpData {
t: 4,
ip: "1.1.1.1".to_string(),
hostname: "test1".to_string(),
date: now.to_rfc3339().to_string(),
src: "ssh".to_string(),
}),
},
})
.await;
}
@ -534,13 +537,13 @@ mod test {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
ipdata: IpData {
t: 4,
ip: "1.1.1.2".to_string(),
hostname: "test2".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
},
})
.await;
}
@ -549,13 +552,13 @@ mod test {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
ipdata: IpData {
t: 4,
ip: "1.1.1.3".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
},
})
.await;
@ -563,13 +566,13 @@ mod test {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
ipdata: IpData {
t: 4,
ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
},
})
.await;
@ -577,26 +580,26 @@ mod test {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
ipdata: IpData {
t: 4,
ip: "1.1.1.4".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
},
})
.await;
ctx.update_blocklist(&mut IpEvent {
msgtype: String::from("add"),
mode: String::from("ws"),
hostname: String::from("localhost"),
ipdata: Some(IpData {
ipdata: IpData {
t: 6,
ip: "2a00:1450:4007:805::2003".to_string(),
hostname: "testgood".to_string(),
date: now.to_rfc3339().to_string(),
src: "http".to_string(),
}),
},
})
.await;

View File

@ -21,29 +21,45 @@ pub fn fwglobalinit<'a>() -> ((Batch, Table), (Batch, Table)) {
macro_rules! initrules {
($batch:expr, $table:expr, $chain:ident) => {
let mut $chain = Chain::new(&CString::new(PKG_NAME).unwrap(), &$table);
$chain.set_hook(nftnl::Hook::In, 1);
$chain.set_policy(nftnl::Policy::Accept);
$batch.add(&$chain, nftnl::MsgType::Add);
$batch.add(&Rule::new(&$chain), nftnl::MsgType::Del);
let mut rule = Rule::new(&$chain);
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 4u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
rule.add_expr(&nft_expr!(counter));
rule.add_expr(&nft_expr!(verdict accept));
$batch.add(&rule, nftnl::MsgType::Add);
};
}
};}
macro_rules! createrules {
($ipdata:ident, $chain:ident, $batch:ident, $t:ty, $ip_t:ident) => {
($ipdata:ident, $chain:ident, $batch:ident) => {
let mut rule = Rule::new(&$chain);
let ip = $ipdata.ip.parse::<$t>().unwrap();
match $ipdata.t {
4 => {
let ip = $ipdata.ip.parse::<Ipv4Addr>().unwrap();
rule.add_expr(&nft_expr!(payload ipv4 saddr));
rule.add_expr(&nft_expr!(cmp == ip));
},
6 => {
let ip = $ipdata.ip.parse::<Ipv6Addr>().unwrap();
rule.add_expr(&nft_expr!(payload ipv6 saddr));
rule.add_expr(&nft_expr!(cmp == ip));
},
_ => {
let ip = $ipdata.ip.parse::<Ipv4Addr>().unwrap();
rule.add_expr(&nft_expr!(payload ipv4 saddr));
rule.add_expr(&nft_expr!(cmp == ip));
},
};
rule.add_expr(&nft_expr!(payload $ip_t saddr));
rule.add_expr(&nft_expr!(cmp == ip));
rule.add_expr(&nft_expr!(ct state));
rule.add_expr(&nft_expr!(bitwise mask 10u32, xor 0u32));
rule.add_expr(&nft_expr!(cmp != 0u32));
@ -83,62 +99,30 @@ fn fwinit(t: FwTableType) -> (Batch, Table) {
}
pub fn fwblock(
ips_add_all: &Vec<IpData>,
ips_add: &Vec<IpData>,
ret: &mut Vec<String>,
fwlen: &mut usize,
) -> std::result::Result<(), Error> {
let ((mut batch4, table4), (mut batch6, table6)) = fwglobalinit();
let mut chain4 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table4);
let mut chain6 = Chain::new(&CString::new(PKG_NAME).unwrap(), &table6);
initrules!(batch4, table4, chain4);
initrules!(batch6, table6, chain6);
let mut factor = 1;
if ips_add_all.len() > 100 {
factor = (ips_add_all.len() / 10) as usize
}
let ips_add_tmp: Vec<IpData> = ips_add_all.clone().iter().map(|x| x.clone()).collect();
let mut ips_add_iter = ips_add_tmp.chunks(factor);
let mut ips_add: Vec<&[IpData]> = vec![];
while let Some(x) = ips_add_iter.next() {
ips_add.push(x);
}
// build and add rules
for ipdata_group in ips_add.clone() {
for ipdata in ipdata_group {
match ipdata.t {
4 => {
createrules!(ipdata, chain4, batch4, Ipv4Addr, ipv4);
}
6 => {
createrules!(ipdata, chain6, batch6, Ipv6Addr, ipv6);
}
_ => {}
}
}
for ipdata in ips_add.clone() {
createrules!(ipdata, chain4, batch4);
createrules!(ipdata, chain6, batch6);
}
// validate and send batch
for b in [batch4, batch6] {
let bf = b.finalize();
match send_and_process(&bf) {
Ok(_) => {}
Err(e) => {
println!("error sending batch: {e}");
}
};
send_and_process(&bf).unwrap();
}
if fwlen != &mut ips_add_all.len() {
ret.push(format!(
"{length} ip in firewall",
length = ips_add_all.len()
));
if fwlen != &mut ips_add.len() {
ret.push(format!("{length} ip in firewall", length = ips_add.len()));
}
*fwlen = ips_add_all.len();
*fwlen = ips_add.len();
Ok(())
}

View File

@ -22,7 +22,7 @@ pub struct IpEvent {
pub msgtype: String,
pub mode: String,
pub hostname: String,
pub ipdata: Option<IpData>,
pub ipdata: IpData,
}
#[macro_export]
@ -35,14 +35,6 @@ macro_rules! ipevent {
ipdata: $ipdata,
}
};
($msgtype:expr,$mode:expr,$hostname:expr) => {
IpEvent {
msgtype: String::from($msgtype),
mode: String::from($mode),
hostname: $hostname,
ipdata: None,
}
};
}
#[derive(Clone, Debug, Serialize, Deserialize)]

View File

@ -11,7 +11,6 @@ use chrono::prelude::*;
use chrono::prelude::{DateTime, Local};
use chrono::Duration;
use nix::sys::inotify::{InitFlags, Inotify, InotifyEvent};
use sd_notify::*;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender};
@ -22,13 +21,6 @@ const BL_CHAN_SIZE: usize = 32;
const WS_CHAN_SIZE: usize = 64;
const LOOP_MAX_WAIT: u64 = 5;
macro_rules! log_with_systemd {
($msg:expr) => {
println!("{}", $msg);
notify(false, &[NotifyState::Status(format!("{}", $msg).as_str())]).unwrap();
};
}
pub async fn run() {
let inotify = Inotify::init(InitFlags::empty()).unwrap();
let globalctx = Context::new(&inotify).await;
@ -39,7 +31,7 @@ pub async fn run() {
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
let mut last_cfg_reload: DateTime<Local> = Local::now().trunc_subsecs(0);
log_with_systemd!(format!("Launching {}, version {}", PKG_NAME, pkgversion));
println!("Launching {}, version {}", PKG_NAME, pkgversion);
fwglobalinit();
let ctxapi = Arc::clone(&ctxarc);
@ -68,8 +60,6 @@ pub async fn run() {
compare_files_changes(&ctxclone, &mut blrx, &ipeventclone).await;
});
notify(false, &[NotifyState::Ready]).unwrap();
loop {
let mut ret: Vec<String> = Vec::new();
@ -86,9 +76,8 @@ pub async fn run() {
if received_ip.msgtype == "bootstrap".to_string() {
for ip_to_send in toblock {
let ipe = ipevent!("init","ws",gethostname(true),Some(ip_to_send));
let ipe = ipevent!("init","ws",gethostname(true),ip_to_send);
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
break;
}
@ -104,24 +93,18 @@ pub async fn run() {
// send ip list to api and ws sockets
if let Some(ipevent) = filtered_ipevent {
if received_ip.msgtype != "init" {
log_with_systemd!(format!("sending {} to api and ws", ipevent.ipdata.clone().unwrap().ip));
println!("sending {} to api and ws", ipevent.ipdata.ip);
let ipe = ipevent!("add","ws",gethostname(true),ipevent.ipdata);
send_to_ipbl_api(&server.clone(), &ipe).await;
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
let status = send_to_ipbl_websocket(&mut wssocketrr, &ipe).await;
if !status {
wssocketrr = websocketreqrep(&ctxwsrr).await;
continue;
}
}
}
}
_val = sleep_s(LOOP_MAX_WAIT) => {
let ipe = ipevent!("ping", "ws", gethostname(true));
if !send_to_ipbl_websocket(&mut wssocketrr, &ipe).await {
wssocketrr.close(None).unwrap();
wssocketrr = websocketreqrep(&ctxwsrr).await;
}
}
_val = sleep_s(LOOP_MAX_WAIT) => {}
};
let ctxclone = Arc::clone(&ctxarc);
@ -129,8 +112,7 @@ pub async fn run() {
// log lines
if ret.len() > 0 {
let result = ret.join(", ");
log_with_systemd!(format!("{result}"));
println!("{ret}", ret = ret.join(", "));
}
let ctxclone = Arc::clone(&ctxarc);
@ -146,26 +128,8 @@ async fn handle_cfg_reload(
) {
let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - *last_cfg_reload) > Duration::seconds(LOOP_MAX_WAIT as i64) {
let inotify;
loop {
inotify = match inoarc.try_read() {
Ok(o) => o,
Err(e) => {
println!("{e}");
sleep_s(1).await;
continue;
}
};
break;
}
let mut ctxtest = match ctxclone.try_write() {
Ok(o) => o,
Err(e) => {
println!("{e}");
return;
}
};
match ctxtest.load(&inotify).await {
let inotify = inoarc.read().await;
match ctxclone.write().await.load(&inotify).await {
Ok(_) => {
*last_cfg_reload = Local::now().trunc_subsecs(0);
}
@ -189,8 +153,8 @@ async fn handle_fwblock(ctxclone: Arc<RwLock<Context>>, ret: &mut Vec<String>, f
// apply firewall blocking
match fwblock(&toblock, ret, fwlen) {
Ok(_) => {}
Err(e) => {
println!("err: {e}, unable to push firewall rules, use super user")
Err(err) => {
println!("Err: {err}, unable to push firewall rules, use super user")
}
};
}
@ -241,7 +205,8 @@ async fn compare_files_changes(
match modfiles.inevent.name {
Some(name) => {
let filename = name.to_str().unwrap();
for (sak, sa) in sas.clone().iter_mut() {
for sak in sas.clone().keys() {
let sa = sas.get(sak).unwrap();
if modfiles.inevent.wd == sa.wd {
let handle: String;
if sa.filename.as_str() == "" {
@ -279,7 +244,7 @@ async fn compare_files_changes(
}
}
for ip in iplist {
let ipe = ipevent!("add", "file", gethostname(true), Some(ip));
let ipe = ipevent!("add", "file", gethostname(true), ip);
let ipetx = ipeventtx.read().await;
ipetx.send(ipe).await.unwrap();
}

View File

@ -4,42 +4,38 @@ use serde_json;
use std::io;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpSocket;
use tokio::sync::RwLock;
pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let ctxarc = ctxarc.clone();
let addr: String = { ctxarc.read().await.cfg.api.parse().unwrap() };
let listener = match TcpListener::bind(addr).await {
Ok(o) => o,
Err(e) => {
println!("error: {e}");
let addr = { ctxarc.read().await.cfg.api.parse().unwrap() };
let socket = TcpSocket::new_v4().unwrap();
match socket.bind(addr) {
Ok(_) => {}
Err(_) => {
println!("can't bind monitoring socket, exiting...");
std::process::exit(1);
}
};
}
socket.set_reuseaddr(true).unwrap();
let listener = socket.listen(128).unwrap();
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((mut socket, _addr)) => {
let mut buf = vec![0; 1024];
Ok((stream, _addr)) => {
stream.readable().await.unwrap();
let (reader, mut writer) = stream.into_split();
let mut buf: [u8; 16] = [0; 16];
match socket.readable().await {
Ok(_) => {
match socket.try_read(&mut buf) {
Ok(_) => {}
Err(e) => {
println!("error: {e}");
continue;
}
};
}
match reader.try_read(&mut buf) {
Ok(_) => {}
Err(e) => {
println!("error: {e}");
continue;
println!("error: {}", e);
}
}
};
let msg = match String::from_utf8(buf.to_vec()) {
Ok(o) => o.trim_matches(char::from(0)).trim().to_string(),
Err(_) => "".to_string(),
@ -47,15 +43,15 @@ pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let res = format_result(&ctxarc, msg.as_str()).await;
match socket.write_all(res.as_bytes()).await {
match writer.write_all(format!("{res}").as_bytes()).await {
Ok(_) => {}
Err(e) => {
println!("error: {e}");
Err(err) => {
println!("ee {err}");
}
}
}
Err(err) => {
println!("error: {err}");
println!("unable to serialize data: {err}");
}
}
}

View File

@ -6,9 +6,9 @@ use tokio::time::{sleep, Duration};
pub fn read_lines(filename: &String, offset: u64) -> Option<Box<dyn Read>> {
let mut file = match File::open(filename) {
Ok(o) => o,
Err(e) => {
println!("error: {e}");
Ok(f) => f,
Err(err) => {
println!("{err}");
return None;
}
};
@ -21,11 +21,6 @@ pub async fn sleep_s(s: u64) {
sleep(Duration::from_secs(s)).await;
}
#[allow(dead_code)]
pub async fn sleep_ms(m: u64) {
sleep(Duration::from_millis(m)).await;
}
pub fn gethostname(show_fqdn: bool) -> String {
let hostname_cstr = unistd::gethostname().expect("Failed getting hostname");
let fqdn = hostname_cstr

View File

@ -11,12 +11,12 @@ pub async fn send_to_ipbl_api(server: &str, ip: &IpEvent) {
let mut try_req = 0;
let client = httpclient();
loop {
match push_ip(&client, &server, &ip.ipdata.clone().unwrap()).await {
match push_ip(&client, &server, &ip.ipdata).await {
Ok(_) => {
break;
}
Err(e) => {
println!("error: {e}");
Err(err) => {
println!("{err}");
sleep_s(1).await;
if try_req == MAX_FAILED_API_RATE {
break;

View File

@ -14,13 +14,13 @@ use tungstenite::*;
pub async fn websocketreqrep(
ctxarc: &Arc<RwLock<Context>>,
) -> WebSocket<MaybeTlsStream<TcpStream>> {
let (mut wssocketrr, bootstrap_event, wscfg);
let (mut wssocketrr, bootstrap_event, cfg);
{
let ctx = ctxarc.read().await;
bootstrap_event = ctx.cfg.bootstrap_event().clone();
wscfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
cfg = ctx.cfg.ws.get("reqrep").unwrap().clone();
}
wssocketrr = websocketconnect(&wscfg, &gethostname(true)).await.unwrap();
wssocketrr = websocketconnect(&cfg, &gethostname(true)).await.unwrap();
send_to_ipbl_websocket(&mut wssocketrr, &bootstrap_event).await;
return wssocketrr;
@ -45,24 +45,15 @@ pub async fn websocketpubsub(
Ok(msg) => {
let tosend: IpEvent = match serde_json::from_str(msg.to_string().as_str()) {
Ok(o) => o,
Err(e) => {
println!("error in pubsub: {e:?}");
Err(_e) => {
continue;
}
};
match tosend.ipdata.clone() {
Some(o) => {
if o.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
let txps = txpubsub.read().await;
txps.send(tosend).await.unwrap();
}
}
None => {
let txps = txpubsub.read().await;
txps.send(tosend.clone()).await.unwrap();
}
if tosend.ipdata.hostname != gethostname(true)
|| tosend.msgtype == "init".to_string()
{
let txps = txpubsub.read().await;
txps.send(tosend).await.unwrap();
}
}
Err(e) => {
@ -114,12 +105,11 @@ pub async fn send_to_ipbl_websocket(
Ok(_) => {}
Err(e) => {
println!("err send read: {e:?}");
return false;
return handle_websocket_error(ws);
}
};
} else {
println!("can't write to socket");
return false;
return handle_websocket_error(ws);
};
if ws.can_read() {
@ -127,13 +117,16 @@ pub async fn send_to_ipbl_websocket(
Ok(_) => {}
Err(e) => {
println!("err send read: {e:?}");
return false;
return handle_websocket_error(ws);
}
};
} else {
println!("can't read from socket");
return false;
return handle_websocket_error(ws);
};
true
}
fn handle_websocket_error(ws: &mut WebSocket<MaybeTlsStream<TcpStream>>) -> bool {
ws.close(None).unwrap();
return false;
}