1
// Copyright (c) 2024, Sevki <s@sevki.io>
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
#![doc(
5
    html_logo_url = "https://raw.githubusercontent.com/sevki/jetstream/main/logo/JetStream.png"
6
)]
7
#![doc(
8
    html_favicon_url = "https://raw.githubusercontent.com/sevki/jetstream/main/logo/JetStream.png"
9
)]
10
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
11

            
12
use jetstream_rpc::{Frame, Protocol};
13
use jetstream_wireformat::WireFormat;
14
use tokio_util::{
15
    bytes::{self, Buf, BufMut},
16
    codec::{Decoder, Encoder},
17
};
18

            
19
#[cfg(feature = "websocket")]
20
pub mod websocket;
21

            
22
pub struct ClientCodec<P>
23
where
24
    P: Protocol,
25
{
26
    _p: std::marker::PhantomData<P>,
27
}
28

            
29
impl<P: jetstream_rpc::Protocol> Encoder<Frame<P::Request>> for ClientCodec<P> {
30
    type Error = std::io::Error;
31

            
32
202
    fn encode(
33
202
        &mut self,
34
202
        item: Frame<P::Request>,
35
202
        dst: &mut bytes::BytesMut,
36
202
    ) -> Result<(), Self::Error> {
37
202
        WireFormat::encode(&item, &mut dst.writer())
38
202
    }
39
}
40

            
41
impl<P: jetstream_rpc::Protocol> Decoder for ClientCodec<P> {
42
    type Error = std::io::Error;
43
    type Item = Frame<P::Response>;
44

            
45
400
    fn decode(
46
400
        &mut self,
47
400
        src: &mut bytes::BytesMut,
48
400
    ) -> Result<Option<Self::Item>, Self::Error> {
49
400
        // check to see if you have at least 4 bytes to figure out the size
50
400
        if src.len() < 4 {
51
198
            src.reserve(4);
52
198
            return Ok(None);
53
202
        }
54
202
        let Some(mut bytz) = src.get(..4) else {
55
            return Ok(None);
56
        };
57

            
58
202
        let byte_size: u32 = WireFormat::decode(&mut bytz)?;
59
202
        if src.len() < byte_size as usize {
60
            src.reserve(byte_size as usize);
61
            return Ok(None);
62
202
        }
63
202
        Frame::<P::Response>::decode(&mut src.reader()).map(Some)
64
400
    }
65
}
66

            
67
impl<P> Default for ClientCodec<P>
68
where
69
    P: jetstream_rpc::Protocol,
70
{
71
4
    fn default() -> Self {
72
4
        Self {
73
4
            _p: std::marker::PhantomData,
74
4
        }
75
4
    }
76
}