bazzar/crates/channels/src/rpc.rs

41 lines
1.5 KiB
Rust
Raw Normal View History

2022-11-05 01:08:45 +01:00
use std::net::{IpAddr, Ipv4Addr};
use futures::StreamExt;
use tarpc::server;
use tarpc::server::incoming::Incoming;
use tarpc::server::{Channel, Serve};
use tarpc::tokio_serde::formats::Bincode;
pub async fn start<Server, Req, Build>(name: &str, port: u16, build: Build)
where
Server: Serve<Req> + Send + 'static + Clone,
Build: Fn() -> Server,
<Server as Serve<Req>>::Fut: Send,
<Server as Serve<Req>>::Resp: serde::Serialize + Send + 'static,
Req: Send + 'static,
Req: for<'l> serde::Deserialize<'l>,
{
let server_addr = (IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Bincode::default)
.await
.unwrap();
tracing::info!("Starting {} rpc at {}", name, listener.local_addr());
listener.config_mut().max_frame_length(usize::MAX);
listener
// Ignore accept errors.
.filter_map(|r| futures::future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
// Limit channels to 8 per IP.
.max_channels_per_key(8, |t| t.transport().peer_addr().unwrap().ip())
.max_concurrent_requests_per_channel(20)
// serve is generated by the service attribute. It takes as input any type implementing
// the generated World trait.
.map(|channel| channel.execute(build()))
// Max 10 channels.
.buffer_unordered(10)
.for_each(|_| async {})
.await;
tracing::info!("RPC channel closed");
}