use std::{str::*, time::Duration}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; include!("./build_and_boot.rs"); macro_rules! read_text { ($r: expr, $buffer: expr) => {{ println!("<- Reading response"); let len = tokio::time::timeout(Duration::from_millis(100), $r.read($buffer)) .await .expect("timeout!") .expect("failed to read"); let res = from_utf8(&$buffer[..len]) .expect("not a string") .to_string(); $buffer.fill(0); res }}; } macro_rules! ex_txt { ($r: expr, $buffer: expr, $txt: expr) => {{ let res = read_text!($r, $buffer); assert_eq!(res.as_str(), $txt); $buffer.fill(0); }}; } macro_rules! ex_ok { ($r: expr, $buffer: expr) => { ex_txt!($r, $buffer, "+OK\r\n") }; } #[tokio::test(flavor = "multi_thread")] async fn test_connect() { build_bin().await; println!("starting nats-server"); let _server = boot_bin().await; println!("starting nats client"); let client = tokio::net::TcpStream::connect(("0.0.0.0", 4222)) .await .unwrap(); client.set_ttl(3).unwrap(); let (mut r, mut w) = client.into_split(); let mut buffer = [0; 1024]; read_text!(&mut r, &mut buffer); println!("-> Sending connect"); w.write(b"connect {}\r\n").await.unwrap(); ex_ok!(&mut r, &mut buffer); println!(" OK"); println!("-> Sending SUB"); w.write(b"sub foo.* 90\r\n").await.unwrap(); ex_ok!(&mut r, &mut buffer); println!(" OK"); println!("-> Sending PUB (1)"); w.write(b"pub foo.bar 5\r\n").await.unwrap(); w.write(b"world\r\n").await.unwrap(); ex_ok!(&mut r, &mut buffer); ex_txt!(&mut r, &mut buffer, "MSG foo.bar 90 5 world\r\n"); println!(" OK"); println!("-> Sending PUB (2)"); w.write(b"pub foo.bar 5\r\n").await.unwrap(); w.write(b"world\r\n").await.unwrap(); ex_ok!(&mut r, &mut buffer); ex_txt!(&mut r, &mut buffer, "MSG foo.bar 90 5 world\r\n"); println!(" OK"); println!("-> Sending PUB REPLY-TO"); w.write(b"pub foo.bar home 5\r\n").await.unwrap(); w.write(b"world\r\n").await.unwrap(); ex_ok!(&mut r, &mut buffer); ex_txt!(&mut r, &mut buffer, "MSG foo.bar home 90 5 world\r\n"); println!(" OK"); }