Compare commits
2 Commits
028966d539
...
fdca7ada0b
Author | SHA1 | Date | |
---|---|---|---|
fdca7ada0b | |||
da80468266 |
209
src/buffers.rs
Normal file
209
src/buffers.rs
Normal file
@ -0,0 +1,209 @@
|
|||||||
|
struct ReadState {
|
||||||
|
// rx: OwnedReadHalf,
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
cursor: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReadState {
|
||||||
|
fn has_next(&self) -> bool {
|
||||||
|
self.buffer[..self.cursor]
|
||||||
|
.iter()
|
||||||
|
.position(|v| *v == b'\n')
|
||||||
|
.is_some()
|
||||||
|
}
|
||||||
|
fn shift_ramaining(&mut self) {
|
||||||
|
if !self.has_next() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let pos = self.cursor;
|
||||||
|
let mut last_old = 0;
|
||||||
|
let mut last_new = 0;
|
||||||
|
for (new, old) in (pos..self.buffer.len()).enumerate() {
|
||||||
|
let v = self.buffer[old];
|
||||||
|
if v == 0 {
|
||||||
|
last_old = old;
|
||||||
|
last_new = new;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// eprintln!("new {new} old {old} = {c} ({v})", c = v as char);
|
||||||
|
self.buffer[new] = v;
|
||||||
|
self.buffer[old] = 0;
|
||||||
|
}
|
||||||
|
for old in (last_new..pos).chain(last_old..self.buffer.len()) {
|
||||||
|
// eprintln!("erase {old}");
|
||||||
|
self.buffer[old] = 0;
|
||||||
|
}
|
||||||
|
self.cursor = 0;
|
||||||
|
}
|
||||||
|
fn next(&mut self) -> Option<&[u8]> {
|
||||||
|
let Some(end) = self.buffer[self.cursor..self.buffer.len()]
|
||||||
|
.iter()
|
||||||
|
.position(|v| *v == b'\n').map(|n| n + self.cursor) else {
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
// eprintln!("cursor {} end {end}", self.cursor);
|
||||||
|
let s = &self.buffer[self.cursor..=end];
|
||||||
|
self.cursor = (end + 1).min(self.buffer.len() - 1);
|
||||||
|
Some(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn shift_empty() {
|
||||||
|
let mut state = ReadState {
|
||||||
|
buffer: vec![0, 0, 0, 0, 0, 0, 0, 0],
|
||||||
|
cursor: 0,
|
||||||
|
};
|
||||||
|
state.shift_ramaining();
|
||||||
|
let expected = vec![0, 0, 0, 0, 0, 0, 0, 0];
|
||||||
|
assert_eq!(
|
||||||
|
std::str::from_utf8(&state.buffer),
|
||||||
|
std::str::from_utf8(&expected),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn shift_single_unterminated() {
|
||||||
|
let mut state = ReadState {
|
||||||
|
buffer: vec![b'h', b'e', b'l', b'l', b'o', 0, 0, 0],
|
||||||
|
cursor: 0,
|
||||||
|
};
|
||||||
|
state.shift_ramaining();
|
||||||
|
let expected = vec![b'h', b'e', b'l', b'l', b'o', 0, 0, 0];
|
||||||
|
assert_eq!(
|
||||||
|
std::str::from_utf8(&state.buffer),
|
||||||
|
std::str::from_utf8(&expected),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn shift_single_terminated() {
|
||||||
|
let mut state = ReadState {
|
||||||
|
buffer: vec![b'h', b'e', b'l', b'l', b'o', b'\r', b'\n', 0],
|
||||||
|
cursor: 0,
|
||||||
|
};
|
||||||
|
state.shift_ramaining();
|
||||||
|
let expected = vec![b'h', b'e', b'l', b'l', b'o', b'\r', b'\n', 0];
|
||||||
|
assert_eq!(
|
||||||
|
std::str::from_utf8(&state.buffer),
|
||||||
|
std::str::from_utf8(&expected),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn shift_single_unterminated_moved() {
|
||||||
|
let mut state = ReadState {
|
||||||
|
buffer: b"hello\r\nworld\0\0\0\0".to_vec(),
|
||||||
|
cursor: 7,
|
||||||
|
};
|
||||||
|
state.shift_ramaining();
|
||||||
|
let expected = b"world\0\0\0\0\0\0\0\0\0\0\0".to_vec();
|
||||||
|
assert_eq!(
|
||||||
|
std::str::from_utf8(&state.buffer),
|
||||||
|
std::str::from_utf8(&expected),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn shift_single_terminated_moved() {
|
||||||
|
let mut state = ReadState {
|
||||||
|
buffer: b"hello\r\nworld\r\n\0\0\0\0\0\0\0\0".to_vec(),
|
||||||
|
cursor: 7,
|
||||||
|
};
|
||||||
|
state.shift_ramaining();
|
||||||
|
let expected = b"world\r\n\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0".to_vec();
|
||||||
|
assert_eq!(
|
||||||
|
std::str::from_utf8(&state.buffer),
|
||||||
|
std::str::from_utf8(&expected),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn empty_next() {
|
||||||
|
let mut state = ReadState {
|
||||||
|
buffer: b"\0\0\0\0\0\0\0\0".to_vec(),
|
||||||
|
cursor: 0,
|
||||||
|
};
|
||||||
|
assert_eq!(state.next(), None);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn depleted_next() {
|
||||||
|
let mut state = ReadState {
|
||||||
|
buffer: b"hello\0\0\0\0\0\0\0\0".to_vec(),
|
||||||
|
cursor: 5,
|
||||||
|
};
|
||||||
|
assert_eq!(state.next(), None);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn one_world_next() {
|
||||||
|
let mut state = ReadState {
|
||||||
|
buffer: b"hello\r\n\0\0\0\0\0\0\0\0".to_vec(),
|
||||||
|
cursor: 0,
|
||||||
|
};
|
||||||
|
assert_eq!(state.next(), Some(b"hello\r\n".as_slice()));
|
||||||
|
assert_eq!(state.cursor, 7);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn tree_world_next() {
|
||||||
|
let mut state = ReadState {
|
||||||
|
buffer: b"hello\r\nworld\r\nnats\r\n\0\0\0\0\0\0\0\0".to_vec(),
|
||||||
|
cursor: 0,
|
||||||
|
};
|
||||||
|
assert_eq!(state.next(), Some(b"hello\r\n".as_slice()));
|
||||||
|
assert_eq!(state.cursor, 7);
|
||||||
|
assert_eq!(state.next(), Some(b"world\r\n".as_slice()));
|
||||||
|
assert_eq!(state.cursor, 14);
|
||||||
|
assert_eq!(state.next(), Some(b"nats\r\n".as_slice()));
|
||||||
|
assert_eq!(state.cursor, 20);
|
||||||
|
assert_eq!(state.next(), None);
|
||||||
|
assert_eq!(state.cursor, 20);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn partial_move_and_shift() {
|
||||||
|
let mut state = ReadState {
|
||||||
|
buffer: b"hello\r\nworld\r\nnats\r\n\0\0\0\0\0\0\0\0".to_vec(),
|
||||||
|
cursor: 0,
|
||||||
|
};
|
||||||
|
state.next();
|
||||||
|
state.next();
|
||||||
|
state.shift_ramaining();
|
||||||
|
let expected = b"nats\r\n\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0".to_vec();
|
||||||
|
assert_eq!(
|
||||||
|
std::str::from_utf8(&state.buffer),
|
||||||
|
std::str::from_utf8(&expected),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn move_and_shift_non_terminated() {
|
||||||
|
let mut state = ReadState {
|
||||||
|
buffer: b"hello\r\nworld\r\nnats\0\0\0\0\0\0\0\0\0\0".to_vec(),
|
||||||
|
cursor: 0,
|
||||||
|
};
|
||||||
|
state.next();
|
||||||
|
state.next();
|
||||||
|
state.next();
|
||||||
|
state.shift_ramaining();
|
||||||
|
let expected = b"nats\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0".to_vec();
|
||||||
|
assert_eq!(
|
||||||
|
std::str::from_utf8(&state.buffer),
|
||||||
|
std::str::from_utf8(&expected),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
fn full_move_and_shift() {
|
||||||
|
let mut state = ReadState {
|
||||||
|
buffer: b"hello\r\nworld\r\nnats\r\n\0\0\0\0\0\0\0\0".to_vec(),
|
||||||
|
cursor: 0,
|
||||||
|
};
|
||||||
|
state.next();
|
||||||
|
state.next();
|
||||||
|
state.next();
|
||||||
|
assert_eq!(state.cursor, 20);
|
||||||
|
state.shift_ramaining();
|
||||||
|
let expected = b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0".to_vec();
|
||||||
|
assert_eq!(
|
||||||
|
std::str::from_utf8(&state.buffer),
|
||||||
|
std::str::from_utf8(&expected),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
32
src/main.rs
32
src/main.rs
@ -31,6 +31,8 @@ mod server;
|
|||||||
use server::*;
|
use server::*;
|
||||||
mod error;
|
mod error;
|
||||||
use error::*;
|
use error::*;
|
||||||
|
mod buffers;
|
||||||
|
use buffers::*;
|
||||||
|
|
||||||
#[derive(gumdrop::Options)]
|
#[derive(gumdrop::Options)]
|
||||||
struct Args {
|
struct Args {
|
||||||
@ -101,6 +103,10 @@ async fn handle_commands(
|
|||||||
) {
|
) {
|
||||||
let mut buf = Vec::with_capacity(server.max_payload);
|
let mut buf = Vec::with_capacity(server.max_payload);
|
||||||
|
|
||||||
|
// use futures::stream::{self, StreamExt};
|
||||||
|
// let read_stream = futures::stream::unfold(rx, |rx| {
|
||||||
|
// });
|
||||||
|
|
||||||
let conn_info = ConnectionInfo {
|
let conn_info = ConnectionInfo {
|
||||||
client_ip: addr.to_string(),
|
client_ip: addr.to_string(),
|
||||||
server_id: "asdajsdjasdjas".into(),
|
server_id: "asdajsdjasdjas".into(),
|
||||||
@ -184,19 +190,6 @@ async fn read_to_msg_end(rx: &mut OwnedReadHalf, buf: &mut Vec<u8>) -> Option<us
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start_stream_health_check(ctx: Context, dead_notif: Sender<()>) {
|
|
||||||
const PING_PONG_SPAN: time::Duration = time::Duration::seconds(1);
|
|
||||||
loop {
|
|
||||||
if *ctx.last_live_msg.read().await + PING_PONG_SPAN >= OffsetDateTime::now_utc() {
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
dead_notif.send(()).await.unwrap();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_msg<'stream>(
|
async fn handle_msg<'stream>(
|
||||||
msg: &str,
|
msg: &str,
|
||||||
buf: &mut Vec<u8>,
|
buf: &mut Vec<u8>,
|
||||||
@ -230,3 +223,16 @@ async fn handle_msg<'stream>(
|
|||||||
};
|
};
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn start_stream_health_check(ctx: Context, dead_notif: Sender<()>) {
|
||||||
|
const PING_PONG_SPAN: time::Duration = time::Duration::seconds(1);
|
||||||
|
loop {
|
||||||
|
if *ctx.last_live_msg.read().await + PING_PONG_SPAN >= OffsetDateTime::now_utc() {
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
dead_notif.send(()).await.unwrap();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user