1
use std::{net::SocketAddr, path::Path};
2

            
3
use echo_protocol::EchoChannel;
4
use jetstream::prelude::*;
5
use jetstream_macros::service;
6
use jetstream_rpc::Framed;
7
use okstd::prelude::*;
8
use s2n_quic::{client::Connect, provider::tls, Client, Server};
9

            
10
1600
#[service]
11
pub trait Echo {
12
    async fn ping(&mut self) -> Result<String, Error>;
13
}
14

            
15
struct EchoImpl {}
16

            
17
impl Echo for EchoImpl {
18
300
    async fn ping(&mut self) -> Result<String, Error> {
19
200
        eprintln!("Ping received");
20
200
        eprintln!("Pong sent");
21
200
        Ok("pong".to_string())
22
200
    }
23
}
24

            
25
pub static CA_CERT_PEM: &str =
26
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/ca-cert.pem");
27
pub static CLIENT_CERT_PEM: &str =
28
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-cert.pem");
29
pub static CLIENT_KEY_PEM: &str =
30
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-key.pem");
31
pub static SERVER_CERT_PEM: &str =
32
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-cert.pem");
33
pub static SERVER_KEY_PEM: &str =
34
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-key.pem");
35

            
36
3
async fn server() -> Result<(), Box<dyn std::error::Error>> {
37
2
    let tls = tls::default::Server::builder()
38
2
        .with_trusted_certificate(Path::new(CA_CERT_PEM))?
39
2
        .with_certificate(
40
2
            Path::new(SERVER_CERT_PEM),
41
2
            Path::new(SERVER_KEY_PEM),
42
2
        )?
43
2
        .with_client_authentication()?
44
2
        .build()?;
45

            
46
2
    let mut server = Server::builder()
47
2
        .with_tls(tls)?
48
2
        .with_io("127.0.0.1:4433")?
49
2
        .start()?;
50

            
51
4
    while let Some(mut connection) = server.accept().await {
52
2
        // spawn a new task for the connection
53
2
        tokio::spawn(async move {
54
2
            eprintln!(
55
2
                "Connection accepted from {:?}",
56
2
                connection.remote_addr()
57
2
            );
58

            
59
2
            while let Ok(Some(stream)) =
60
4
                connection.accept_bidirectional_stream().await
61
2
            {
62
2
                // spawn a new task for the stream
63
2
                tokio::spawn(async move {
64
2
                    eprintln!(
65
2
                        "Stream opened from {:?}",
66
2
                        &stream.connection().remote_addr()
67
2
                    );
68
2
                    let echo = EchoImpl {};
69
2
                    let servercodec: jetstream::prelude::server::service::ServerCodec<
70
2
                        echo_protocol::EchoService<EchoImpl>,
71
2
                    > = Default::default();
72
2
                    let framed = Framed::new(stream, servercodec);
73
2
                    let mut serv = echo_protocol::EchoService { inner: echo };
74
                    if let Err(e) =
75
2
                        server::service::run(&mut serv, framed).await
76
                    {
77
                        eprintln!("Server stream error: {:?}", e);
78
                    }
79
2
                });
80
            }
81
2
        });
82
    }
83

            
84
    Ok(())
85
}
86

            
87
3
async fn client() -> Result<(), Box<dyn std::error::Error>> {
88
2
    let tls = tls::default::Client::builder()
89
2
        .with_certificate(Path::new(CA_CERT_PEM))?
90
2
        .with_client_identity(
91
2
            Path::new(CLIENT_CERT_PEM),
92
2
            Path::new(CLIENT_KEY_PEM),
93
2
        )?
94
2
        .build()?;
95

            
96
2
    let client = Client::builder()
97
2
        .with_tls(tls)?
98
2
        .with_io("0.0.0.0:0")?
99
2
        .start()?;
100

            
101
2
    let addr: SocketAddr = "127.0.0.1:4433".parse()?;
102
2
    let connect = Connect::new(addr).with_server_name("localhost");
103
2
    let mut connection = client.connect(connect).await?;
104

            
105
    // ensure the connection doesn't time out with inactivity
106
2
    connection.keep_alive(true)?;
107

            
108
    // open a new stream and split the receiving and sending sides
109
2
    let stream = connection.open_bidirectional_stream().await?;
110
2
    let client_codec: jetstream_client::ClientCodec<EchoChannel> =
111
2
        Default::default();
112
2
    let mut framed = Framed::new(stream, client_codec);
113
2
    let mut chan = EchoChannel {
114
2
        inner: Box::new(&mut framed),
115
2
    };
116
202
    for _ in 0..100 {
117
200
        if let Err(e) = chan.ping().await {
118
            eprintln!("Ping error: {:?}", e);
119
            break;
120
200
        }
121
    }
122
    // Properly close the stream
123
2
    drop(chan);
124
2
    Ok(())
125
2
}
126

            
127
#[okstd::test]
128
#[okstd::log(debug)]
129
2
async fn echo() {
130
2
    tokio::select! {
131
2
      _ = server() => {},
132
2
      _ = client() => {},
133
    }
134
}