chore: micodus_server rework #1

Open
paulbsd wants to merge 8 commits from develop into master
6 changed files with 180 additions and 92 deletions
Showing only changes of commit 3e3734befa - Show all commits

View File

@ -1,20 +1,25 @@
pub mod db; mod db;
pub mod parser; mod parser;
pub mod serve; mod serve;
//pub use db::libsql_engine::LibSQLEngine; use std::process::exit;
pub use db::sqlite_engine::SQLiteEngine;
pub use db::*; use db::sqlite_engine::SQLiteEngine;
pub use parser::*; use db::{SQLEngine::SQLite, *};
use serve::*;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let mut s: SQLEngine = SQLEngine::SQLite(SQLiteEngine::default()); println!(
"starting micodus_server version {}",
env!("CARGO_PKG_VERSION")
);
let mut s = SQLite(SQLiteEngine::default());
s.connect(); s.connect();
s.init(); s.init();
let receiver = serve::control_server().await; let receiver = control_server().await;
serve::micodus_protocol_server(receiver).await; micodus_protocol_server(receiver).await;
} }
#[allow(dead_code)] #[allow(dead_code)]
@ -23,5 +28,5 @@ fn test() {
let data: Vec<u8> = vec![0x36, 0x31, 0x33, 0x32, 0x31, 0x31, 0x38]; let data: Vec<u8> = vec![0x36, 0x31, 0x33, 0x32, 0x31, 0x31, 0x38];
let code = BcdNumber::try_from(&data as &[u8]).unwrap(); let code = BcdNumber::try_from(&data as &[u8]).unwrap();
println!("{code}"); println!("{code}");
std::process::exit(0); exit(0);
} }

51
src/old/misc.rs Normal file
View File

@ -0,0 +1,51 @@
async fn serve_old(stream: &TcpStream, recv_clone: &Arc<RwLock<Receiver<u8>>>) {
loop {
let ready = stream
.ready(Interest::READABLE | Interest::WRITABLE)
.await
.unwrap();
if ready.is_readable() {
let mut buf = vec![0; BUFSIZE];
match stream.try_read(&mut buf) {
Ok(_) => match handle(&buf) {
Some(o) => match stream.try_write(&o) {
Ok(_) => {}
Err(e) => {
println!("error: {e}");
break;
}
},
None => {
println!("none");
break;
}
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
println!("{e}");
continue;
}
Err(e) => {
println!("{e}");
break;
}
}
}
if ready.is_writable() {
let mut recv = recv_clone.write().await;
if !recv.is_empty() {
match recv.recv().await {
Some(o) => match o {
1 => {
//socket.try_write(&[o, 0x01]).unwrap();
println!("sent");
}
_ => {}
},
None => {}
}
}
}
}
}

View File

@ -578,6 +578,7 @@ macro_rules! generate_impl {
rawbody.into_iter() rawbody.into_iter()
} }
}*/ }*/
#[allow(unused)]
impl $t { impl $t {
pub fn new(rawbody: &Vec<u8>) -> Self { pub fn new(rawbody: &Vec<u8>) -> Self {
let mut res = Self::default(); let mut res = Self::default();

View File

@ -1,5 +1,8 @@
#![allow(unused_variables)] #![allow(unused_variables)]
pub struct MessageError; pub enum MessageError {
NotOurProtocolError,
BasicError,
}
impl std::fmt::Debug for MessageError { impl std::fmt::Debug for MessageError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@ -13,16 +16,23 @@ impl std::fmt::Display for MessageError {
} }
} }
pub struct NotOurProtocolError; macro_rules! errorgen {
($t:ident,$msg:expr) => {
pub struct $t;
impl std::fmt::Debug for NotOurProtocolError { impl std::fmt::Debug for $t {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "invalid protocol") write!(f, $msg)
} }
}
impl std::fmt::Display for $t {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, $msg)
}
}
};
} }
impl std::fmt::Display for NotOurProtocolError { errorgen!(NotOurProtocolError, "invalid protocol");
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { errorgen!(BasicError, "basic error");
write!(f, "invalid protocol")
}
}

View File

@ -1,13 +1,13 @@
#![allow(unused_variables)] #![allow(unused_variables)]
pub mod body; mod body;
pub mod error; mod error;
pub mod header; mod header;
pub use body::*; use body::*;
pub use error::*; use error::*;
pub use header::*; use header::*;
pub use super::db::*; use super::db::*;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -20,7 +20,7 @@ pub fn parse_inbound_msg(
let mut msg: Message = Message::default(); let mut msg: Message = Message::default();
if rawdata.first_byte() != FLAG_DELIMITER { if rawdata.first_byte() != FLAG_DELIMITER {
return Err(MessageError); return Err(MessageError::NotOurProtocolError);
} }
match msg.parse_header(rawdata) { match msg.parse_header(rawdata) {
@ -29,7 +29,7 @@ pub fn parse_inbound_msg(
} }
Err(e) => { Err(e) => {
println!("error parsing header {e}"); println!("error parsing header {e}");
return Err(MessageError); return Err(MessageError::BasicError);
} }
}; };

View File

@ -1,21 +1,28 @@
pub use crate::db::sqlite_engine::SQLiteEngine; use crate::db::sqlite_engine::SQLiteEngine;
pub use crate::db::*; use crate::db::{SQLEngine::*, *};
pub use crate::parser::*; use crate::parser::*;
use std::io; use std::net::SocketAddr;
use std::process::exit;
use std::{collections::HashMap, io, sync::Arc};
use std::sync::Arc; use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
RwLock,
};
use tokio::net::{TcpListener, TcpStream}; const MICODUS_ADDR: &'static str = "0.0.0.0";
use tokio::sync::mpsc::{channel, Receiver, Sender}; const MICODUS_PORT: u64 = 7701;
use tokio::sync::RwLock;
const ADDR: &'static str = "0.0.0.0";
const PORT: u64 = 7701;
const BUFSIZE: usize = 1024; const BUFSIZE: usize = 1024;
const CTRL_ADDR: &'static str = "127.0.0.1";
const CTRL_PORT: u64 = 7702;
pub async fn control_server() -> Receiver<u8> { pub async fn control_server() -> Receiver<u8> {
let listener = TcpListener::bind("127.0.0.1:7702").await.unwrap(); let listener = TcpListener::bind(format!("{}:{}", CTRL_ADDR, CTRL_PORT))
.await
.unwrap();
let (sender, receiver): (Sender<u8>, Receiver<u8>) = channel(100); let (sender, receiver): (Sender<u8>, Receiver<u8>) = channel(100);
tokio::spawn(async move { tokio::spawn(async move {
@ -45,86 +52,97 @@ pub async fn control_server() -> Receiver<u8> {
} }
pub async fn micodus_protocol_server(receiver: Receiver<u8>) { pub async fn micodus_protocol_server(receiver: Receiver<u8>) {
let mut sql: SQLEngine = SQLEngine::SQLite(SQLiteEngine::default()); let mut sql = SQLite(SQLiteEngine::default());
sql.connect(); sql.connect();
let arc_recv = Arc::new(RwLock::new(receiver)); let recv = Arc::new(RwLock::new(receiver));
let listen_addr_str = format!("{}:{}", MICODUS_ADDR, MICODUS_PORT);
let listener = match TcpListener::bind(format!("{}:{}", ADDR, PORT)).await { let listen_addr: SocketAddr = listen_addr_str.parse().unwrap();
Ok(o) => o,
let socket = TcpSocket::new_v4().unwrap();
socket.set_keepalive(true).unwrap();
socket.set_reuseaddr(true).unwrap();
socket.bind(listen_addr).unwrap();
let listener = match socket.listen(1024) {
Ok(l) => l,
Err(e) => { Err(e) => {
println!("error: {e}"); println!("error: {e}");
std::process::exit(1); exit(1);
} }
}; };
//let sql_ref = Arc::clone(&arc_s);
loop { loop {
let recv_clone = Arc::clone(&arc_recv); let recv = Arc::clone(&recv);
let (socket, _remote_addr) = listener.accept().await.unwrap(); let (stream, _remote_addr) = listener.accept().await.unwrap();
println!("accept");
tokio::spawn(async move { tokio::spawn(async move {
serve(&socket, &recv_clone).await; serve(&stream, &recv).await;
}); });
} }
} }
async fn serve(socket: &TcpStream, a: &Arc<RwLock<Receiver<u8>>>) { async fn serve(stream: &TcpStream, recv_clone: &Arc<RwLock<Receiver<u8>>>) {
let mut buf = vec![0; BUFSIZE];
let aa = Arc::clone(&a);
loop { loop {
let mut bb = aa.write().await; let mut recv = recv_clone.write().await;
//let terminal_id = 0;
tokio::select! { tokio::select! {
readable = socket.readable() => { o = stream.readable() => {
match readable { match o {
Ok(_) => { Ok(_) => {
match socket.try_read(&mut buf) { let mut buf = vec![0; BUFSIZE];
match stream.try_read(&mut buf) {
Ok(_) => match handle(&buf) { Ok(_) => match handle(&buf) {
Some(o) => match socket.try_write(&o) { Some(o) => match stream.try_write(&o) {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!("error: {e}"); println!("error: {e}");
break;
} }
}, },
None => { None => {
println!("none"); println!("none");
break;
} }
}, },
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => { Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock { println!("{e}");
println!("{e}"); break;
}
} }
} }}
} Err(e)=> { println!("error socket readable: {e}");}
Err(e) => {
println!("error socket readable: {e}");
}
} }
} },
b = recv.recv() => {
send = bb.recv() => { match b {
match socket.writable().await { Some(res) => {
Ok(_) => { println!("test recv {res}");
match send { },
Some(o) => match o { None => {},
1 => {
socket.try_write(&[o, 0x01]).unwrap();
println!("sent");
}
_ => {},
},
None => {
}
}
}
Err(e) => {
println!("error socket not writable: {e}")
}
} }
} },
else => break,
} }
//if ready.is_writable() {
// if !receiver.is_empty() {
// println!("test2");
// match receiver.recv().await {
// Some(o) => match o {
// 1 => {
// //socket.try_write(&[o, 0x01]).unwrap();
// println!("sent");
// }
// _ => {}
// },
// None => {}
// }
// }
//}
} }
} }
@ -137,7 +155,7 @@ fn handle(buf: &Vec<u8>) -> Option<Vec<u8>> {
//println!("raw query: {:X?}", o.to_raw()); //println!("raw query: {:X?}", o.to_raw());
match Message::store(&o) { match Message::store(&o) {
Some(log) => { Some(log) => {
let mut s: SQLEngine = SQLEngine::SQLite(SQLiteEngine::default()); let mut s = SQLite(SQLiteEngine::default());
s.connect(); s.connect();
s.insert(&log); s.insert(&log);
} }
@ -163,3 +181,6 @@ fn handle(buf: &Vec<u8>) -> Option<Vec<u8>> {
} }
} }
} }
#[allow(unused)]
pub struct ControllerMap(HashMap<String, (Sender<u8>, Receiver<u8>)>);