1
use std::{net::SocketAddr, path::Path};
2

            
3
use echo_protocol::EchoChannel;
4
use jetstream::prelude::*;
5
use jetstream_macros::service;
6
use jetstream_rpc::{client::ClientCodec, server::run, Framed};
7
use okstd::prelude::*;
8
use s2n_quic::{client::Connect, provider::tls, Client, Server};
9

            
10
/// Example service demonstrating tracing support.
11
///
12
/// This example shows three ways to use tracing:
13
/// 1. Method-level #[instrument] attributes with custom configuration
14
/// 2. Auto-instrumentation via #[service(tracing)]
15
/// 3. No tracing for specific methods
16
#[service(tracing)] // Enable auto-instrumentation for all methods
17
pub trait Echo {
18
    /// This method has custom tracing configuration
19
    #[instrument(
20
        name = "echo_ping",
21
        skip(self),
22
        fields(
23
            message_len = message.len(),
24
        ),
25
        level = "debug"
26
    )]
27
    async fn ping(
28
        &mut self,
29
        ctx: Context,
30
        message: String,
31
    ) -> Result<String, Error>;
32

            
33
    /// This method uses default auto-instrumentation from #[service(tracing)]
34
    async fn echo(&mut self, text: String) -> Result<String, Error>;
35
}
36

            
37
struct EchoImpl {}
38

            
39
impl Echo for EchoImpl {
40
    async fn ping(
41
        &mut self,
42
        ctx: Context,
43
        message: String,
44
    ) -> Result<String, Error> {
45
        tracing::info!("Ping received: {} {:?} ", message, ctx);
46

            
47
        Ok(format!("Pong: {}", message))
48
    }
49

            
50
    async fn echo(&mut self, text: String) -> Result<String, Error> {
51
        tracing::info!("Echo received: {}", text);
52
        Ok(text)
53
    }
54
}
55

            
56
pub static CA_CERT_PEM: &str =
57
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/ca-cert.pem");
58
pub static CLIENT_CERT_PEM: &str =
59
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-cert.pem");
60
pub static CLIENT_KEY_PEM: &str =
61
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/client-key.pem");
62
pub static SERVER_CERT_PEM: &str =
63
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-cert.pem");
64
pub static SERVER_KEY_PEM: &str =
65
    concat!(env!("CARGO_MANIFEST_DIR"), "/certs/server-key.pem");
66

            
67
#[cfg(not(windows))]
68
async fn server() -> Result<(), Box<dyn std::error::Error>> {
69
    let tls = tls::default::Server::builder()
70
        .with_trusted_certificate(Path::new(CA_CERT_PEM))?
71
        .with_certificate(
72
            Path::new(SERVER_CERT_PEM),
73
            Path::new(SERVER_KEY_PEM),
74
        )?
75
        .with_client_authentication()?
76
        .build()?;
77

            
78
    let mut server = Server::builder()
79
        .with_tls(tls)?
80
        .with_io("127.0.0.1:4434")?
81
        .start()?;
82

            
83
    tracing::info!("Server listening on 127.0.0.1:4434");
84

            
85
    while let Some(mut connection) = server.accept().await {
86
        tokio::spawn(async move {
87
            tracing::info!(
88
                remote_addr = ?connection.remote_addr(),
89
                "Connection accepted"
90
            );
91

            
92
            while let Ok(Some(stream)) =
93
                connection.accept_bidirectional_stream().await
94
            {
95
                tokio::spawn(async move {
96
                    let echo = EchoImpl {};
97
                    let servercodec: jetstream::prelude::server::ServerCodec<
98
                        echo_protocol::EchoService<EchoImpl>,
99
                    > = Default::default();
100
                    let framed = Framed::new(stream, servercodec);
101
                    let mut serv = echo_protocol::EchoService { inner: echo };
102
                    run(&mut serv, framed).await.unwrap();
103
                });
104
            }
105
        });
106
    }
107

            
108
    Ok(())
109
}
110

            
111
#[cfg(not(windows))]
112
async fn client() -> Result<(), Box<dyn std::error::Error>> {
113
    let tls = tls::default::Client::builder()
114
        .with_certificate(Path::new(CA_CERT_PEM))?
115
        .with_client_identity(
116
            Path::new(CLIENT_CERT_PEM),
117
            Path::new(CLIENT_KEY_PEM),
118
        )?
119
        .build()?;
120

            
121
    let client = Client::builder()
122
        .with_tls(tls)?
123
        .with_io("0.0.0.0:0")?
124
        .start()?;
125

            
126
    let addr: SocketAddr = "127.0.0.1:4434".parse()?;
127
    let connect = Connect::new(addr).with_server_name("localhost");
128
    let mut connection = client.connect(connect).await?;
129

            
130
    connection.keep_alive(true)?;
131

            
132
    let stream = connection.open_bidirectional_stream().await?;
133
    let client_codec: ClientCodec<EchoChannel> = Default::default();
134
    let mut framed = Framed::new(stream, client_codec);
135
    let mut chan = EchoChannel {
136
        inner: Box::new(&mut framed),
137
    };
138

            
139
    tracing::info!("Sending ping...");
140
    let response = chan
141
        .ping(Context::default(), "Hello, World!".to_string())
142
        .await?;
143
    tracing::info!("Received: {}", response);
144

            
145
    tracing::info!("Sending echo...");
146
    let response = chan.echo("Echo test".to_string()).await?;
147
    tracing::info!("Received: {}", response);
148

            
149
    Ok(())
150
}
151

            
152
#[okstd::main]
153
#[cfg(not(windows))]
154
async fn main() {
155
    // Initialize tracing subscriber
156
    tracing_subscriber::fmt()
157
        .with_max_level(tracing::Level::DEBUG)
158
        .with_thread_ids(true)
159
        .with_span_events(
160
            tracing_subscriber::fmt::format::FmtSpan::ENTER
161
                | tracing_subscriber::fmt::format::FmtSpan::EXIT,
162
        )
163
        .init();
164

            
165
    tracing::info!("Starting echo service with tracing example");
166

            
167
    tokio::select! {
168
      _ = server() => {
169
          tracing::info!("Server exited");
170
      },
171
      _ = client() => {
172
          tracing::info!("Client exited");
173
      },
174
    }
175
}
176

            
177
#[cfg(windows)]
178
fn main() {}