Lines
88.89 %
Functions
38.89 %
Branches
100 %
use {
jetstream_rpc::{Error, Frame, Protocol, ServiceTransport},
jetstream_wireformat::WireFormat,
std::pin::pin,
tokio_util::{
bytes::{self, Buf, BufMut},
codec::{Decoder, Encoder},
},
};
pub struct ServerCodec<P: Protocol> {
_phantom: std::marker::PhantomData<P>,
}
impl<P: Protocol> ServerCodec<P> {
pub fn new() -> Self {
Self {
_phantom: std::marker::PhantomData,
impl<P: Protocol> Default for ServerCodec<P> {
fn default() -> Self {
Self::new()
impl<P> Decoder for ServerCodec<P>
where
P: Protocol,
{
type Error = Error;
type Item = Frame<P::Request>;
fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
// check to see if you have at least 4 bytes to figure out the size
if src.len() < 4 {
src.reserve(4);
return Ok(None);
let Some(mut bytz) = src.get(..4) else {
let byte_size: u32 = WireFormat::decode(&mut bytz)?;
if src.len() < byte_size as usize {
src.reserve(byte_size as usize);
Frame::<P::Request>::decode(&mut src.reader())
.map(Some)
.map_err(|_| Error::Custom("()".to_string()))
impl<P> Encoder<Frame<P::Response>> for ServerCodec<P>
fn encode(
&mut self,
item: Frame<P::Response>,
dst: &mut bytes::BytesMut,
) -> Result<(), Self::Error> {
item.encode(&mut dst.writer())
.map(|_| ())
pub async fn run<T, P>(p: &mut P, mut stream: T) -> Result<(), P::Error>
T: ServiceTransport<P>,
use futures::{SinkExt, StreamExt};
let mut a = pin!(p);
while let Some(Ok(frame)) = stream.next().await {
stream.send(a.rpc(frame).await?).await?
Ok(())