Lines
88.89 %
Functions
50 %
Branches
100 %
// Copyright (c) 2024, Sevki <s@sevki.io>
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#![doc(html_logo_url = "https://raw.githubusercontent.com/sevki/jetstream/main/logo/JetStream.png")]
#![doc(
html_favicon_url = "https://raw.githubusercontent.com/sevki/jetstream/main/logo/JetStream.png"
)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use {
jetstream_rpc::{Frame, Protocol},
jetstream_wireformat::WireFormat,
tokio_util::{
bytes::{self, Buf, BufMut},
codec::{Decoder, Encoder},
},
};
#[cfg(feature = "websocket")]
pub mod websocket;
pub struct ClientCodec<P>
where
P: Protocol,
{
_p: std::marker::PhantomData<P>,
}
impl<P: jetstream_rpc::Protocol> Encoder<Frame<P::Request>> for ClientCodec<P> {
type Error = std::io::Error;
fn encode(
&mut self,
item: Frame<P::Request>,
dst: &mut bytes::BytesMut,
) -> Result<(), Self::Error> {
WireFormat::encode(&item, &mut dst.writer())
impl<P: jetstream_rpc::Protocol> Decoder for ClientCodec<P> {
fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
// check to see if you have at least 4 bytes to figure out the size
if src.len() < 4 {
src.reserve(4);
return Ok(None);
let Some(mut bytz) = src.get(..4) else {
let byte_size: u32 = WireFormat::decode(&mut bytz)?;
if src.len() < byte_size as usize {
src.reserve(byte_size as usize);
Frame::<P::Response>::decode(&mut src.reader()).map(Some)
type Item = Frame<P::Response>;
impl<P> Default for ClientCodec<P>
P: jetstream_rpc::Protocol,
fn default() -> Self {
Self {
_p: std::marker::PhantomData,