1
// Copyright (c) 2024, Sevki <s@sevki.io>
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
#![doc(html_logo_url = "https://raw.githubusercontent.com/sevki/jetstream/main/logo/JetStream.png")]
5
#![doc(
6
    html_favicon_url = "https://raw.githubusercontent.com/sevki/jetstream/main/logo/JetStream.png"
7
)]
8
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
9

            
10
use {
11
    jetstream_rpc::{Frame, Protocol},
12
    jetstream_wireformat::WireFormat,
13
    tokio_util::{
14
        bytes::{self, Buf, BufMut},
15
        codec::{Decoder, Encoder},
16
    },
17
};
18

            
19
#[cfg(feature = "websocket")]
20
pub mod websocket;
21

            
22
pub struct ClientCodec<P>
23
where
24
    P: Protocol,
25
{
26
    _p: std::marker::PhantomData<P>,
27
}
28

            
29
impl<P: jetstream_rpc::Protocol> Encoder<Frame<P::Request>> for ClientCodec<P> {
30
    type Error = std::io::Error;
31

            
32
202
    fn encode(
33
202
        &mut self,
34
202
        item: Frame<P::Request>,
35
202
        dst: &mut bytes::BytesMut,
36
202
    ) -> Result<(), Self::Error> {
37
202
        WireFormat::encode(&item, &mut dst.writer())
38
202
    }
39
}
40

            
41
impl<P: jetstream_rpc::Protocol> Decoder for ClientCodec<P> {
42
    type Error = std::io::Error;
43

            
44
400
    fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
45
400
        // check to see if you have at least 4 bytes to figure out the size
46
400
        if src.len() < 4 {
47
198
            src.reserve(4);
48
198
            return Ok(None);
49
202
        }
50
202
        let Some(mut bytz) = src.get(..4) else {
51
            return Ok(None);
52
        };
53

            
54
202
        let byte_size: u32 = WireFormat::decode(&mut bytz)?;
55
202
        if src.len() < byte_size as usize {
56
            src.reserve(byte_size as usize);
57
            return Ok(None);
58
202
        }
59
202
        Frame::<P::Response>::decode(&mut src.reader()).map(Some)
60
400
    }
61

            
62
    type Item = Frame<P::Response>;
63
}
64

            
65
impl<P> Default for ClientCodec<P>
66
where
67
    P: jetstream_rpc::Protocol,
68
{
69
4
    fn default() -> Self {
70
4
        Self {
71
4
            _p: std::marker::PhantomData,
72
4
        }
73
4
    }
74
}