replaced mutex with rwlock #3

Merged
paulbsd merged 1 commits from feature-rwlock into develop 2023-01-13 08:24:36 +01:00
4 changed files with 20 additions and 20 deletions
Showing only changes of commit f18ac68842 - Show all commits

View File

@ -21,5 +21,5 @@ regex = "1.7"
reqwest = { version = "0.11", default-features = false, features = ["json","rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["json","rustls-tls"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
tokio = { version = "1.23", features = ["full"] } tokio = { version = "1.23", features = ["full", "sync"] }
zmq = "0.10" zmq = "0.10"

View File

@ -5,11 +5,11 @@ use std::io;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::TcpSocket; use tokio::net::TcpSocket;
use tokio::sync::Mutex; use tokio::sync::RwLock;
pub async fn apiserver(ctxarc: &Arc<Mutex<Context>>) -> io::Result<()> { pub async fn apiserver(ctxarc: &Arc<RwLock<Context>>) -> io::Result<()> {
let ctxclone = Arc::clone(ctxarc); let ctxclone = Arc::clone(ctxarc);
let ctx = ctxclone.lock().await; let ctx = ctxclone.read().await;
let addr = ctx.cfg.api.parse().unwrap(); let addr = ctx.cfg.api.parse().unwrap();
drop(ctx); drop(ctx);
@ -26,7 +26,7 @@ pub async fn apiserver(ctxarc: &Arc<Mutex<Context>>) -> io::Result<()> {
//let mut buf = [0; 1024]; //let mut buf = [0; 1024];
let data; let data;
{ {
let ctx = ctxclone.lock().await; let ctx = ctxclone.read().await;
data = serde_json::to_string(&ctx.blocklist); data = serde_json::to_string(&ctx.blocklist);
} }

View File

@ -13,7 +13,7 @@ use nix::sys::inotify::InotifyEvent;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex; use tokio::sync::RwLock;
pub const PKG_NAME: &str = env!("CARGO_PKG_NAME"); pub const PKG_NAME: &str = env!("CARGO_PKG_NAME");
const BL_CHAN_SIZE: usize = 32; const BL_CHAN_SIZE: usize = 32;
@ -21,7 +21,7 @@ const ZMQ_CHAN_SIZE: usize = 64;
const API_CHAN_SIZE: usize = 64; const API_CHAN_SIZE: usize = 64;
pub async fn run() { pub async fn run() {
let ctxarc = Arc::new(Mutex::new(Context::new().await)); let ctxarc = Arc::new(RwLock::new(Context::new().await));
let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION); let pkgversion = format!("{}@{}", env!("CARGO_PKG_VERSION"), GIT_VERSION);
@ -72,7 +72,7 @@ pub async fn run() {
val = ipdata_wait => { val = ipdata_wait => {
let mut received_ip = val.unwrap(); let mut received_ip = val.unwrap();
let mut ctx = ctxclone.lock().await; let mut ctx = ctxclone.write().await;
if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() { if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() {
for ip_to_send in &mut ctx.get_blocklist_toblock().await { for ip_to_send in &mut ctx.get_blocklist_toblock().await {
@ -97,7 +97,7 @@ pub async fn run() {
let toblock; let toblock;
{ {
let mut ctx = ctxarc.lock().await; let mut ctx = ctxarc.write().await;
ctx.gc_blocklist().await; ctx.gc_blocklist().await;
toblock = ctx.get_blocklist_toblock().await; toblock = ctx.get_blocklist_toblock().await;
} }
@ -113,7 +113,7 @@ pub async fn run() {
let now_cfg_reload = Local::now().trunc_subsecs(0); let now_cfg_reload = Local::now().trunc_subsecs(0);
if (now_cfg_reload - last_cfg_reload) > Duration::seconds(5) { if (now_cfg_reload - last_cfg_reload) > Duration::seconds(5) {
// reload configuration from the server // reload configuration from the server
let mut ctx = ctxclone.lock().await; let mut ctx = ctxclone.write().await;
match ctx.load().await { match ctx.load().await {
Ok(_) => { Ok(_) => {
last_cfg_reload = Local::now().trunc_subsecs(0); last_cfg_reload = Local::now().trunc_subsecs(0);
@ -128,7 +128,7 @@ pub async fn run() {
} }
} }
async fn watchfiles(ctxarc: &Arc<Mutex<Context>>) -> Receiver<FileEvent> { async fn watchfiles(ctxarc: &Arc<RwLock<Context>>) -> Receiver<FileEvent> {
let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE); let (bltx, blrx): (Sender<FileEvent>, Receiver<FileEvent>) = channel(BL_CHAN_SIZE);
let ctxclone = Arc::clone(ctxarc); let ctxclone = Arc::clone(ctxarc);
tokio::spawn(async move { tokio::spawn(async move {
@ -136,7 +136,7 @@ async fn watchfiles(ctxarc: &Arc<Mutex<Context>>) -> Receiver<FileEvent> {
let events; let events;
let instance; let instance;
{ {
let ctx = ctxclone.lock().await; let ctx = ctxclone.read().await;
instance = ctx.instance.clone(); instance = ctx.instance.clone();
} }
@ -164,7 +164,7 @@ async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> (u64, b
} }
async fn compare_files_changes( async fn compare_files_changes(
ctxarc: &Arc<Mutex<Context>>, ctxarc: &Arc<RwLock<Context>>,
inrx: &mut Receiver<FileEvent>, inrx: &mut Receiver<FileEvent>,
ipdatatx: &Sender<IpData>, ipdatatx: &Sender<IpData>,
) { ) {
@ -176,7 +176,7 @@ async fn compare_files_changes(
let sask; let sask;
let sas; let sas;
{ {
let ctx = ctxarc.lock().await; let ctx = ctxarc.read().await;
sas = ctx.clone().sas; sas = ctx.clone().sas;
sask = sas.keys(); sask = sas.keys();
tnets = ctx.cfg.build_trustnets(); tnets = ctx.cfg.build_trustnets();
@ -199,7 +199,7 @@ async fn compare_files_changes(
let (filesize, sizechanged); let (filesize, sizechanged);
{ {
let mut ctx = ctxarc.lock().await; let mut ctx = ctxarc.write().await;
let sa = ctx.sas.get_mut(sak).unwrap(); let sa = ctx.sas.get_mut(sak).unwrap();
(filesize, sizechanged) = (filesize, sizechanged) =
get_last_file_size(&mut sa.watchedfiles, &handle).await; get_last_file_size(&mut sa.watchedfiles, &handle).await;

View File

@ -4,7 +4,7 @@ use crate::utils::gethostname;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::sync::Mutex; use tokio::sync::RwLock;
const ZMQPROTO: &str = "tcp"; const ZMQPROTO: &str = "tcp";
@ -21,13 +21,13 @@ pub async fn zconnect<'a>(
Ok(socket) Ok(socket)
} }
pub async fn zmqinit(ctx: &Arc<Mutex<Context>>, ipdatatx: &Sender<IpData>) -> zmq::Socket { pub async fn zmqinit(ctx: &Arc<RwLock<Context>>, ipdatatx: &Sender<IpData>) -> zmq::Socket {
let ctxarc = Arc::clone(&ctx); let ctxarc = Arc::clone(&ctx);
let zmqreqsocket; let zmqreqsocket;
let zmqsubsocket; let zmqsubsocket;
{ {
let zmqctx = ctxarc.lock().await; let zmqctx = ctxarc.read().await;
zmqreqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ) zmqreqsocket = zconnect(&zmqctx.cfg.zmq.get("reqrep").unwrap(), zmq::REQ)
.await .await
.unwrap(); .unwrap();
@ -40,10 +40,10 @@ pub async fn zmqinit(ctx: &Arc<Mutex<Context>>, ipdatatx: &Sender<IpData>) -> zm
return zmqreqsocket; return zmqreqsocket;
} }
async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) { async fn listenpubsub(ctx: &Arc<RwLock<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
let prefix; let prefix;
{ {
let ctx = ctx.lock().await; let ctx = ctx.read().await;
prefix = format!( prefix = format!(
"{sub} ", "{sub} ",
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription