Compare commits

..

No commits in common. "fdca7ada0b7d3e4aeafa649b41375082e94f63e1" and "028966d539e8f13c9db9b14f1f61347e7a58d8b3" have entirely different histories.

2 changed files with 13 additions and 228 deletions

View File

@ -1,209 +0,0 @@
struct ReadState {
// rx: OwnedReadHalf,
buffer: Vec<u8>,
cursor: usize,
}
impl ReadState {
fn has_next(&self) -> bool {
self.buffer[..self.cursor]
.iter()
.position(|v| *v == b'\n')
.is_some()
}
fn shift_ramaining(&mut self) {
if !self.has_next() {
return;
}
let pos = self.cursor;
let mut last_old = 0;
let mut last_new = 0;
for (new, old) in (pos..self.buffer.len()).enumerate() {
let v = self.buffer[old];
if v == 0 {
last_old = old;
last_new = new;
break;
}
// eprintln!("new {new} old {old} = {c} ({v})", c = v as char);
self.buffer[new] = v;
self.buffer[old] = 0;
}
for old in (last_new..pos).chain(last_old..self.buffer.len()) {
// eprintln!("erase {old}");
self.buffer[old] = 0;
}
self.cursor = 0;
}
fn next(&mut self) -> Option<&[u8]> {
let Some(end) = self.buffer[self.cursor..self.buffer.len()]
.iter()
.position(|v| *v == b'\n').map(|n| n + self.cursor) else {
return None;
};
// eprintln!("cursor {} end {end}", self.cursor);
let s = &self.buffer[self.cursor..=end];
self.cursor = (end + 1).min(self.buffer.len() - 1);
Some(s)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn shift_empty() {
let mut state = ReadState {
buffer: vec![0, 0, 0, 0, 0, 0, 0, 0],
cursor: 0,
};
state.shift_ramaining();
let expected = vec![0, 0, 0, 0, 0, 0, 0, 0];
assert_eq!(
std::str::from_utf8(&state.buffer),
std::str::from_utf8(&expected),
);
}
#[test]
fn shift_single_unterminated() {
let mut state = ReadState {
buffer: vec![b'h', b'e', b'l', b'l', b'o', 0, 0, 0],
cursor: 0,
};
state.shift_ramaining();
let expected = vec![b'h', b'e', b'l', b'l', b'o', 0, 0, 0];
assert_eq!(
std::str::from_utf8(&state.buffer),
std::str::from_utf8(&expected),
);
}
#[test]
fn shift_single_terminated() {
let mut state = ReadState {
buffer: vec![b'h', b'e', b'l', b'l', b'o', b'\r', b'\n', 0],
cursor: 0,
};
state.shift_ramaining();
let expected = vec![b'h', b'e', b'l', b'l', b'o', b'\r', b'\n', 0];
assert_eq!(
std::str::from_utf8(&state.buffer),
std::str::from_utf8(&expected),
);
}
#[test]
fn shift_single_unterminated_moved() {
let mut state = ReadState {
buffer: b"hello\r\nworld\0\0\0\0".to_vec(),
cursor: 7,
};
state.shift_ramaining();
let expected = b"world\0\0\0\0\0\0\0\0\0\0\0".to_vec();
assert_eq!(
std::str::from_utf8(&state.buffer),
std::str::from_utf8(&expected),
);
}
#[test]
fn shift_single_terminated_moved() {
let mut state = ReadState {
buffer: b"hello\r\nworld\r\n\0\0\0\0\0\0\0\0".to_vec(),
cursor: 7,
};
state.shift_ramaining();
let expected = b"world\r\n\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0".to_vec();
assert_eq!(
std::str::from_utf8(&state.buffer),
std::str::from_utf8(&expected),
);
}
#[test]
fn empty_next() {
let mut state = ReadState {
buffer: b"\0\0\0\0\0\0\0\0".to_vec(),
cursor: 0,
};
assert_eq!(state.next(), None);
}
#[test]
fn depleted_next() {
let mut state = ReadState {
buffer: b"hello\0\0\0\0\0\0\0\0".to_vec(),
cursor: 5,
};
assert_eq!(state.next(), None);
}
#[test]
fn one_world_next() {
let mut state = ReadState {
buffer: b"hello\r\n\0\0\0\0\0\0\0\0".to_vec(),
cursor: 0,
};
assert_eq!(state.next(), Some(b"hello\r\n".as_slice()));
assert_eq!(state.cursor, 7);
}
#[test]
fn tree_world_next() {
let mut state = ReadState {
buffer: b"hello\r\nworld\r\nnats\r\n\0\0\0\0\0\0\0\0".to_vec(),
cursor: 0,
};
assert_eq!(state.next(), Some(b"hello\r\n".as_slice()));
assert_eq!(state.cursor, 7);
assert_eq!(state.next(), Some(b"world\r\n".as_slice()));
assert_eq!(state.cursor, 14);
assert_eq!(state.next(), Some(b"nats\r\n".as_slice()));
assert_eq!(state.cursor, 20);
assert_eq!(state.next(), None);
assert_eq!(state.cursor, 20);
}
#[test]
fn partial_move_and_shift() {
let mut state = ReadState {
buffer: b"hello\r\nworld\r\nnats\r\n\0\0\0\0\0\0\0\0".to_vec(),
cursor: 0,
};
state.next();
state.next();
state.shift_ramaining();
let expected = b"nats\r\n\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0".to_vec();
assert_eq!(
std::str::from_utf8(&state.buffer),
std::str::from_utf8(&expected),
);
}
#[test]
fn move_and_shift_non_terminated() {
let mut state = ReadState {
buffer: b"hello\r\nworld\r\nnats\0\0\0\0\0\0\0\0\0\0".to_vec(),
cursor: 0,
};
state.next();
state.next();
state.next();
state.shift_ramaining();
let expected = b"nats\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0".to_vec();
assert_eq!(
std::str::from_utf8(&state.buffer),
std::str::from_utf8(&expected),
);
}
#[test]
fn full_move_and_shift() {
let mut state = ReadState {
buffer: b"hello\r\nworld\r\nnats\r\n\0\0\0\0\0\0\0\0".to_vec(),
cursor: 0,
};
state.next();
state.next();
state.next();
assert_eq!(state.cursor, 20);
state.shift_ramaining();
let expected = b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0".to_vec();
assert_eq!(
std::str::from_utf8(&state.buffer),
std::str::from_utf8(&expected),
);
}
}

View File

@ -31,8 +31,6 @@ mod server;
use server::*;
mod error;
use error::*;
mod buffers;
use buffers::*;
#[derive(gumdrop::Options)]
struct Args {
@ -103,10 +101,6 @@ async fn handle_commands(
) {
let mut buf = Vec::with_capacity(server.max_payload);
// use futures::stream::{self, StreamExt};
// let read_stream = futures::stream::unfold(rx, |rx| {
// });
let conn_info = ConnectionInfo {
client_ip: addr.to_string(),
server_id: "asdajsdjasdjas".into(),
@ -190,6 +184,19 @@ async fn read_to_msg_end(rx: &mut OwnedReadHalf, buf: &mut Vec<u8>) -> Option<us
}
}
async fn start_stream_health_check(ctx: Context, dead_notif: Sender<()>) {
const PING_PONG_SPAN: time::Duration = time::Duration::seconds(1);
loop {
if *ctx.last_live_msg.read().await + PING_PONG_SPAN >= OffsetDateTime::now_utc() {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
continue;
} else {
dead_notif.send(()).await.unwrap();
}
break;
}
}
async fn handle_msg<'stream>(
msg: &str,
buf: &mut Vec<u8>,
@ -223,16 +230,3 @@ async fn handle_msg<'stream>(
};
Ok(())
}
async fn start_stream_health_check(ctx: Context, dead_notif: Sender<()>) {
const PING_PONG_SPAN: time::Duration = time::Duration::seconds(1);
loop {
if *ctx.last_live_msg.read().await + PING_PONG_SPAN >= OffsetDateTime::now_utc() {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
continue;
} else {
dead_notif.send(()).await.unwrap();
}
break;
}
}