fixed file scanning bug
This commit is contained in:
parent
f7b902aacf
commit
319d246235
@ -217,7 +217,7 @@ impl Context {
|
|||||||
for (src, set) in self.cfg.sets.iter() {
|
for (src, set) in self.cfg.sets.iter() {
|
||||||
let p = Path::new(set.path.as_str());
|
let p = Path::new(set.path.as_str());
|
||||||
if p.is_dir() {
|
if p.is_dir() {
|
||||||
let res = match self.hashwd.get(&set.path.to_string()) {
|
let wd = match self.hashwd.get(&set.path.to_string()) {
|
||||||
Some(wd) => *wd,
|
Some(wd) => *wd,
|
||||||
None => {
|
None => {
|
||||||
let res = self
|
let res = self
|
||||||
@ -238,17 +238,27 @@ impl Context {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
self.sas.insert(
|
match self.sas.get_mut(&src.clone()) {
|
||||||
src.clone(),
|
Some(s) => {
|
||||||
SetMap {
|
s.filename = set.filename.clone();
|
||||||
filename: set.filename.clone(),
|
s.fullpath = fullpath;
|
||||||
fullpath,
|
s.set = set.clone();
|
||||||
set: set.clone(),
|
s.regex = Regex::new(set.regex.as_str()).unwrap();
|
||||||
regex: Regex::new(set.regex.as_str()).unwrap(),
|
}
|
||||||
wd: res,
|
None => {
|
||||||
watchedfiles: HashMap::new(),
|
self.sas.insert(
|
||||||
},
|
src.clone(),
|
||||||
);
|
SetMap {
|
||||||
|
filename: set.filename.clone(),
|
||||||
|
fullpath,
|
||||||
|
set: set.clone(),
|
||||||
|
regex: Regex::new(set.regex.as_str()).unwrap(),
|
||||||
|
wd,
|
||||||
|
watchedfiles: HashMap::new(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -60,22 +60,22 @@ pub async fn process(ctx: &Arc<Mutex<Context>>) {
|
|||||||
let begin: DateTime<Local> = Local::now().trunc_subsecs(0);
|
let begin: DateTime<Local> = Local::now().trunc_subsecs(0);
|
||||||
|
|
||||||
// wait for logs parse and zmq channel receive
|
// wait for logs parse and zmq channel receive
|
||||||
let mut received_ip = ipdatarx.recv().await.unwrap();
|
let mut recvip = ipdatarx.recv().await.unwrap();
|
||||||
|
|
||||||
// lock the context mutex
|
// lock the context mutex
|
||||||
let ctxarc = Arc::clone(&ctx);
|
let ctxarc = Arc::clone(&ctx);
|
||||||
let mut ctx = ctxarc.lock().await;
|
let mut ctx = ctxarc.lock().await;
|
||||||
|
|
||||||
if received_ip.ip == "".to_string() && received_ip.mode == "init".to_string() {
|
if recvip.ip == "".to_string() && recvip.mode == "init".to_string() {
|
||||||
for ip_to_send in &mut ctx.get_blocklist_toblock().await {
|
for sndip in &mut ctx.get_blocklist_toblock().await {
|
||||||
ip_to_send.mode = "init".to_string();
|
sndip.mode = "init".to_string();
|
||||||
send_to_ipbl_zmq(&reqsocket, ip_to_send).await;
|
send_to_ipbl_zmq(&reqsocket, sndip).await;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// refresh context blocklist
|
// refresh context blocklist
|
||||||
let filtered_ip = ctx.update_blocklist(&mut received_ip).await;
|
let filtered_ip = ctx.update_blocklist(&mut recvip).await;
|
||||||
ctx.gc_blocklist().await;
|
ctx.gc_blocklist().await;
|
||||||
|
|
||||||
// send ip list to ws and zmq sockets
|
// send ip list to ws and zmq sockets
|
||||||
@ -122,21 +122,24 @@ async fn watchfiles(ctx: &Arc<Mutex<Context>>) -> Receiver<FileEvent> {
|
|||||||
events = ctx.instance.read_events().unwrap();
|
events = ctx.instance.read_events().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
for inotifyevent in events {
|
for inevent in events {
|
||||||
let date: DateTime<Local> = Local::now().trunc_subsecs(0);
|
let date: DateTime<Local> = Local::now().trunc_subsecs(0);
|
||||||
bltx.send(FileEvent { inotifyevent, date }).await.unwrap();
|
bltx.send(FileEvent { inevent, date }).await.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
blrx
|
blrx
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_last_file_size(watchedfiles: &mut HashMap<String, u64>, path: &str) -> u64 {
|
async fn get_last_file_size(w: &mut HashMap<String, u64>, path: &str) -> u64 {
|
||||||
let currentlen = match std::fs::metadata(&path.to_string()) {
|
let currentlen = match std::fs::metadata(&path.to_string()) {
|
||||||
Ok(u) => u.len().clone(),
|
Ok(u) => u.len().clone(),
|
||||||
Err(_) => 0u64,
|
Err(e) => {
|
||||||
|
println!("{e}");
|
||||||
|
0u64
|
||||||
|
}
|
||||||
};
|
};
|
||||||
let lastlen = match watchedfiles.insert(path.to_string(), currentlen) {
|
let lastlen = match w.insert(path.to_string(), currentlen) {
|
||||||
Some(u) => u,
|
Some(u) => u,
|
||||||
None => 0,
|
None => 0,
|
||||||
};
|
};
|
||||||
@ -145,44 +148,44 @@ async fn get_last_file_size(watchedfiles: &mut HashMap<String, u64>, path: &str)
|
|||||||
|
|
||||||
async fn compare_files_changes(
|
async fn compare_files_changes(
|
||||||
ctx: &Arc<Mutex<Context>>,
|
ctx: &Arc<Mutex<Context>>,
|
||||||
inotifyrx: &mut Receiver<FileEvent>,
|
inrx: &mut Receiver<FileEvent>,
|
||||||
ipdatatx: &Sender<IpData>,
|
ipdatatx: &Sender<IpData>,
|
||||||
) {
|
) {
|
||||||
let mut trustnets;
|
let mut tnets;
|
||||||
loop {
|
loop {
|
||||||
let modifiedfiles = inotifyrx.recv().await.unwrap();
|
let modfiles = inrx.recv().await.unwrap();
|
||||||
let mut list: Vec<IpData> = vec![];
|
let mut iplist: Vec<IpData> = vec![];
|
||||||
|
|
||||||
let mut ctx = ctx.lock().await;
|
let mut ctx = ctx.lock().await;
|
||||||
trustnets = build_trustnets(&ctx.cfg.trustnets);
|
tnets = build_trustnets(&ctx.cfg.trustnets);
|
||||||
|
|
||||||
match modifiedfiles.inotifyevent.name {
|
match modfiles.inevent.name {
|
||||||
Some(name) => {
|
Some(name) => {
|
||||||
let inotify_filename = name.to_str().unwrap();
|
let filename = name.to_str().unwrap();
|
||||||
for sak in &mut ctx.clone().sas.keys() {
|
for sak in &mut ctx.clone().sas.keys() {
|
||||||
let sa = &mut ctx.sas.get_mut(sak).unwrap();
|
let sa = &mut ctx.sas.get_mut(sak).unwrap();
|
||||||
if modifiedfiles.inotifyevent.wd == sa.wd {
|
if modfiles.inevent.wd == sa.wd {
|
||||||
let handle_filename: String;
|
let handle: String;
|
||||||
if sa.filename.as_str() == "" {
|
if sa.filename.as_str() == "" {
|
||||||
handle_filename = format!("{}/{}", &sa.fullpath, inotify_filename);
|
handle = format!("{}/{}", &sa.fullpath, filename);
|
||||||
} else if inotify_filename.starts_with(sa.filename.as_str()) {
|
} else if filename.starts_with(sa.filename.as_str()) {
|
||||||
handle_filename = sa.fullpath.to_owned();
|
handle = sa.fullpath.to_owned();
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let filesize =
|
let filesize = get_last_file_size(&mut sa.watchedfiles, &handle).await;
|
||||||
get_last_file_size(&mut sa.watchedfiles, &handle_filename).await;
|
println!("{handle}, {filesize}");
|
||||||
|
|
||||||
match read_lines(&handle_filename, filesize) {
|
match read_lines(&handle, filesize) {
|
||||||
Some(lines) => {
|
Some(lines) => {
|
||||||
filter(
|
filter(
|
||||||
lines,
|
lines,
|
||||||
&mut list,
|
&mut iplist,
|
||||||
&trustnets,
|
&tnets,
|
||||||
&sa.regex,
|
&sa.regex,
|
||||||
&sa.set.src,
|
&sa.set.src,
|
||||||
&modifiedfiles.date,
|
&modfiles.date,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
None => {}
|
None => {}
|
||||||
@ -191,7 +194,7 @@ async fn compare_files_changes(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
drop(ctx);
|
drop(ctx);
|
||||||
for ip in list {
|
for ip in iplist {
|
||||||
ipdatatx.send(ip).await.unwrap();
|
ipdatatx.send(ip).await.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,13 +13,13 @@ use tokio::sync::mpsc::Sender;
|
|||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
pub struct FileEvent {
|
pub struct FileEvent {
|
||||||
pub inotifyevent: InotifyEvent,
|
pub inevent: InotifyEvent,
|
||||||
pub date: DateTime<Local>,
|
pub date: DateTime<Local>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for FileEvent {
|
impl std::fmt::Debug for FileEvent {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
write!(f, "{ie:?}", ie = self.inotifyevent)
|
write!(f, "{ie:?}", ie = self.inevent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,8 +62,8 @@ async fn send_to_ipbl_ws(ctx: &Context, ip: &mut IpData, ret: &mut Vec<String>)
|
|||||||
async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
|
async fn listenpubsub(ctx: &Arc<Mutex<Context>>, txpubsub: Sender<IpData>, socket: zmq::Socket) {
|
||||||
let ctx = ctx.lock().await;
|
let ctx = ctx.lock().await;
|
||||||
let prefix = format!(
|
let prefix = format!(
|
||||||
"{subscription} ",
|
"{sub} ",
|
||||||
subscription = ctx.cfg.zmq.get("pubsub").unwrap().subscription
|
sub = ctx.cfg.zmq.get("pubsub").unwrap().subscription
|
||||||
);
|
);
|
||||||
socket
|
socket
|
||||||
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
|
.set_subscribe(ctx.cfg.zmq.get("pubsub").unwrap().subscription.as_bytes())
|
||||||
|
Loading…
Reference in New Issue
Block a user