diff --git a/src/main.rs b/src/main.rs index 3bce6ec..7a6b737 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,20 +1,25 @@ -pub mod db; -pub mod parser; -pub mod serve; +mod db; +mod parser; +mod serve; -//pub use db::libsql_engine::LibSQLEngine; -pub use db::sqlite_engine::SQLiteEngine; -pub use db::*; -pub use parser::*; +use std::process::exit; + +use db::sqlite_engine::SQLiteEngine; +use db::{SQLEngine::SQLite, *}; +use serve::*; #[tokio::main] async fn main() { - let mut s: SQLEngine = SQLEngine::SQLite(SQLiteEngine::default()); + println!( + "starting micodus_server version {}", + env!("CARGO_PKG_VERSION") + ); + let mut s = SQLite(SQLiteEngine::default()); s.connect(); s.init(); - let receiver = serve::control_server().await; - serve::micodus_protocol_server(receiver).await; + let receiver = control_server().await; + micodus_protocol_server(receiver).await; } #[allow(dead_code)] @@ -23,5 +28,5 @@ fn test() { let data: Vec = vec![0x36, 0x31, 0x33, 0x32, 0x31, 0x31, 0x38]; let code = BcdNumber::try_from(&data as &[u8]).unwrap(); println!("{code}"); - std::process::exit(0); + exit(0); } diff --git a/src/old/misc.rs b/src/old/misc.rs new file mode 100644 index 0000000..36a8ceb --- /dev/null +++ b/src/old/misc.rs @@ -0,0 +1,51 @@ +async fn serve_old(stream: &TcpStream, recv_clone: &Arc>>) { + loop { + let ready = stream + .ready(Interest::READABLE | Interest::WRITABLE) + .await + .unwrap(); + + if ready.is_readable() { + let mut buf = vec![0; BUFSIZE]; + match stream.try_read(&mut buf) { + Ok(_) => match handle(&buf) { + Some(o) => match stream.try_write(&o) { + Ok(_) => {} + Err(e) => { + println!("error: {e}"); + break; + } + }, + None => { + println!("none"); + break; + } + }, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + println!("{e}"); + continue; + } + Err(e) => { + println!("{e}"); + break; + } + } + } + + if ready.is_writable() { + let mut recv = recv_clone.write().await; + if !recv.is_empty() { + match recv.recv().await { + Some(o) => match o { + 1 => { + //socket.try_write(&[o, 0x01]).unwrap(); + println!("sent"); + } + _ => {} + }, + None => {} + } + } + } + } +} diff --git a/src/parser/body.rs b/src/parser/body.rs index 823eb18..ec91ae5 100644 --- a/src/parser/body.rs +++ b/src/parser/body.rs @@ -578,6 +578,7 @@ macro_rules! generate_impl { rawbody.into_iter() } }*/ + #[allow(unused)] impl $t { pub fn new(rawbody: &Vec) -> Self { let mut res = Self::default(); diff --git a/src/parser/error.rs b/src/parser/error.rs index cf57574..e537d43 100644 --- a/src/parser/error.rs +++ b/src/parser/error.rs @@ -1,5 +1,8 @@ #![allow(unused_variables)] -pub struct MessageError; +pub enum MessageError { + NotOurProtocolError, + BasicError, +} impl std::fmt::Debug for MessageError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -13,16 +16,23 @@ impl std::fmt::Display for MessageError { } } -pub struct NotOurProtocolError; +macro_rules! errorgen { + ($t:ident,$msg:expr) => { + pub struct $t; -impl std::fmt::Debug for NotOurProtocolError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "invalid protocol") - } + impl std::fmt::Debug for $t { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, $msg) + } + } + + impl std::fmt::Display for $t { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, $msg) + } + } + }; } -impl std::fmt::Display for NotOurProtocolError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "invalid protocol") - } -} +errorgen!(NotOurProtocolError, "invalid protocol"); +errorgen!(BasicError, "basic error"); diff --git a/src/parser/mod.rs b/src/parser/mod.rs index a32a613..ada98d3 100644 --- a/src/parser/mod.rs +++ b/src/parser/mod.rs @@ -1,13 +1,13 @@ #![allow(unused_variables)] -pub mod body; -pub mod error; -pub mod header; +mod body; +mod error; +mod header; -pub use body::*; -pub use error::*; -pub use header::*; +use body::*; +use error::*; +use header::*; -pub use super::db::*; +use super::db::*; use std::collections::VecDeque; @@ -20,7 +20,7 @@ pub fn parse_inbound_msg( let mut msg: Message = Message::default(); if rawdata.first_byte() != FLAG_DELIMITER { - return Err(MessageError); + return Err(MessageError::NotOurProtocolError); } match msg.parse_header(rawdata) { @@ -29,7 +29,7 @@ pub fn parse_inbound_msg( } Err(e) => { println!("error parsing header {e}"); - return Err(MessageError); + return Err(MessageError::BasicError); } }; diff --git a/src/serve.rs b/src/serve.rs index 4817154..b82ed03 100644 --- a/src/serve.rs +++ b/src/serve.rs @@ -1,21 +1,28 @@ -pub use crate::db::sqlite_engine::SQLiteEngine; -pub use crate::db::*; -pub use crate::parser::*; +use crate::db::sqlite_engine::SQLiteEngine; +use crate::db::{SQLEngine::*, *}; +use crate::parser::*; -use std::io; +use std::net::SocketAddr; +use std::process::exit; +use std::{collections::HashMap, io, sync::Arc}; -use std::sync::Arc; +use tokio::net::{TcpListener, TcpSocket, TcpStream}; +use tokio::sync::{ + mpsc::{channel, Receiver, Sender}, + RwLock, +}; -use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::sync::RwLock; - -const ADDR: &'static str = "0.0.0.0"; -const PORT: u64 = 7701; +const MICODUS_ADDR: &'static str = "0.0.0.0"; +const MICODUS_PORT: u64 = 7701; const BUFSIZE: usize = 1024; +const CTRL_ADDR: &'static str = "127.0.0.1"; +const CTRL_PORT: u64 = 7702; + pub async fn control_server() -> Receiver { - let listener = TcpListener::bind("127.0.0.1:7702").await.unwrap(); + let listener = TcpListener::bind(format!("{}:{}", CTRL_ADDR, CTRL_PORT)) + .await + .unwrap(); let (sender, receiver): (Sender, Receiver) = channel(100); tokio::spawn(async move { @@ -45,86 +52,97 @@ pub async fn control_server() -> Receiver { } pub async fn micodus_protocol_server(receiver: Receiver) { - let mut sql: SQLEngine = SQLEngine::SQLite(SQLiteEngine::default()); + let mut sql = SQLite(SQLiteEngine::default()); sql.connect(); - let arc_recv = Arc::new(RwLock::new(receiver)); + let recv = Arc::new(RwLock::new(receiver)); + let listen_addr_str = format!("{}:{}", MICODUS_ADDR, MICODUS_PORT); - let listener = match TcpListener::bind(format!("{}:{}", ADDR, PORT)).await { - Ok(o) => o, + let listen_addr: SocketAddr = listen_addr_str.parse().unwrap(); + + let socket = TcpSocket::new_v4().unwrap(); + socket.set_keepalive(true).unwrap(); + socket.set_reuseaddr(true).unwrap(); + socket.bind(listen_addr).unwrap(); + + let listener = match socket.listen(1024) { + Ok(l) => l, Err(e) => { println!("error: {e}"); - std::process::exit(1); + exit(1); } }; - //let sql_ref = Arc::clone(&arc_s); loop { - let recv_clone = Arc::clone(&arc_recv); - let (socket, _remote_addr) = listener.accept().await.unwrap(); + let recv = Arc::clone(&recv); + let (stream, _remote_addr) = listener.accept().await.unwrap(); + + println!("accept"); tokio::spawn(async move { - serve(&socket, &recv_clone).await; + serve(&stream, &recv).await; }); } } -async fn serve(socket: &TcpStream, a: &Arc>>) { - let mut buf = vec![0; BUFSIZE]; - let aa = Arc::clone(&a); +async fn serve(stream: &TcpStream, recv_clone: &Arc>>) { loop { - let mut bb = aa.write().await; - //let terminal_id = 0; - + let mut recv = recv_clone.write().await; tokio::select! { - readable = socket.readable() => { - match readable { + o = stream.readable() => { + match o { Ok(_) => { - match socket.try_read(&mut buf) { + let mut buf = vec![0; BUFSIZE]; + match stream.try_read(&mut buf) { Ok(_) => match handle(&buf) { - Some(o) => match socket.try_write(&o) { + Some(o) => match stream.try_write(&o) { Ok(_) => {} Err(e) => { println!("error: {e}"); + break; } }, None => { println!("none"); + break; } }, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - println!("{e}"); - } + println!("{e}"); + break; } - } - } - Err(e) => { - println!("error socket readable: {e}"); - } + }} + Err(e)=> { println!("error socket readable: {e}");} } - } - - send = bb.recv() => { - match socket.writable().await { - Ok(_) => { - match send { - Some(o) => match o { - 1 => { - socket.try_write(&[o, 0x01]).unwrap(); - println!("sent"); - } - _ => {}, - }, - None => { - } - } - } - Err(e) => { - println!("error socket not writable: {e}") - } + }, + b = recv.recv() => { + match b { + Some(res) => { + println!("test recv {res}"); + }, + None => {}, } - } + }, + else => break, } + + //if ready.is_writable() { + // if !receiver.is_empty() { + // println!("test2"); + // match receiver.recv().await { + // Some(o) => match o { + // 1 => { + // //socket.try_write(&[o, 0x01]).unwrap(); + // println!("sent"); + // } + // _ => {} + // }, + // None => {} + // } + // } + //} } } @@ -137,7 +155,7 @@ fn handle(buf: &Vec) -> Option> { //println!("raw query: {:X?}", o.to_raw()); match Message::store(&o) { Some(log) => { - let mut s: SQLEngine = SQLEngine::SQLite(SQLiteEngine::default()); + let mut s = SQLite(SQLiteEngine::default()); s.connect(); s.insert(&log); } @@ -163,3 +181,6 @@ fn handle(buf: &Vec) -> Option> { } } } + +#[allow(unused)] +pub struct ControllerMap(HashMap, Receiver)>);