1
use {
2
    echo_protocol::EchoChannel,
3
    jetstream::prelude::*,
4
    jetstream_rpc::Framed,
5
    server::service::run,
6
    std::net::{IpAddr, Ipv4Addr},
7
    turmoil::{
8
        net::{TcpListener, TcpStream},
9
        Builder,
10
    },
11
};
12

            
13
12
#[service]
14
pub trait Echo {
15
    async fn ping(&mut self) -> Result<(), Error>;
16
    async fn pong(&mut self) -> Result<(), Error>;
17
}
18

            
19
struct EchoImpl {}
20

            
21
impl Echo for EchoImpl {
22
3
    async fn ping(&mut self) -> Result<(), Error> {
23
2
        Ok(())
24
2
    }
25

            
26
    async fn pong(&mut self) -> Result<(), Error> {
27
        todo!()
28
    }
29
}
30

            
31
const PORT: u16 = 1738;
32

            
33
3
async fn bind_to_v4(port: u16) -> std::result::Result<TcpListener, std::io::Error> {
34
2
    TcpListener::bind((IpAddr::from(Ipv4Addr::UNSPECIFIED), port)).await
35
2
}
36

            
37
3
async fn bind() -> std::result::Result<TcpListener, std::io::Error> {
38
2
    bind_to_v4(PORT).await
39
2
}
40

            
41
2
fn network_partitions_during_connect() -> turmoil::Result {
42
2
    let mut sim = Builder::new().build();
43
2

            
44
3
    sim.host("server", || {
45
2
        async {
46
2
            let listener = bind().await?;
47
            loop {
48
2
                let (stream, _) = listener.accept().await?;
49
2
                let echo = EchoImpl {};
50
2
                let servercodec: jetstream::prelude::server::service::ServerCodec<
51
2
                    echo_protocol::EchoService<EchoImpl>,
52
2
                > = Default::default();
53
2
                let framed = Framed::with_capacity(stream, servercodec, 1024 * 1024 * 10);
54
2
                let mut serv = echo_protocol::EchoService { inner: echo };
55
2
                run(&mut serv, framed).await.expect("server run failed");
56
            }
57
        }
58
3
    });
59
2

            
60
3
    sim.client("client", async {
61
2
        let stream = TcpStream::connect(("server", PORT)).await?;
62
2
        let client_codec: jetstream_client::ClientCodec<EchoChannel> = Default::default();
63
2
        let mut framed = Framed::new(stream, client_codec);
64
2
        let mut chan = EchoChannel {
65
2
            inner: Box::new(&mut framed),
66
2
        };
67
2
        chan.ping().await.expect("ping failed");
68
2
        Ok(())
69
3
    });
70
2

            
71
2
    sim.run()
72
2
}
73

            
74
2
#[okstd::test]
75
fn e2e() {
76
    network_partitions_during_connect().expect("network partitions during connect failed");
77
}