JetStream Cloudflare
Let's say you've defined a service like so
use jetstream::prelude::*;
#[cfg(feature = "server")]
mod server;
#[service]
pub trait Radar {
async fn ping(
&mut self,
) -> std::result::Result<(), jetstream::prelude::Error>;
}
The glue code for running it on Cloudflare Workers is
use jetstream::cloudflare::DefaultHtmlFallback;
use worker::{event, *};
use crate::{radar_protocol, Radar};
struct RadarWorker;
impl Radar for RadarWorker {
async fn ping(
&mut self,
) -> std::result::Result<(), jetstream::prelude::Error> {
Ok(())
}
}
#[event(fetch)]
async fn fetch(
req: Request,
env: Env,
ctx: worker::Context,
) -> Result<Response> {
let handler = radar_protocol::RadarService { inner: RadarWorker };
let mut router =
jetstream::cloudflare::Router::<DefaultHtmlFallback>::new([handler]);
router.fetch(req, env, ctx).await
}
The code for connecting to it is as follows:
#![cfg(feature = "client")]
use std::time::Instant;
use argh::FromArgs;
use jetstream::{
cloudflare::JETSTREAM_PROTO_HEADER_KEY,
prelude::tracing,
websocket::tokio_tungstenite::{
connect_async, tungstenite::client::IntoClientRequest,
},
};
use jetstream_radar::{
radar_protocol::{self, RadarChannel},
Radar,
};
use reqwest::header::HeaderValue;
use url::Url;
#[derive(FromArgs, PartialEq, Debug)]
/// Top-level command.
struct Client {
#[argh(subcommand)]
nested: Nested,
}
#[derive(FromArgs, PartialEq, Debug)]
#[argh(subcommand)]
enum Nested {
Ping(Ping),
}
#[derive(FromArgs, PartialEq, Debug)]
/// First subcommand.
#[argh(subcommand, name = "ping")]
struct Ping {
#[argh(
option,
default = "Url::parse(\"wss://radar.jetstream.rs\").unwrap()"
)]
/// url to call
url: Url,
}
#[tokio::main]
async fn main() {
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install rustls crypto provider");
let args: Client = argh::from_env();
match args.nested {
Nested::Ping(web_socket) => {
tracing::info!("Connecting to {}", web_socket.url);
let mut req = web_socket.url.clone().into_client_request().unwrap();
// this is a custom header, doesn't have anything to do with websocket handshake
req.headers_mut().insert(
JETSTREAM_PROTO_HEADER_KEY,
HeaderValue::from_static(radar_protocol::PROTOCOL_VERSION),
);
tracing::info!("Attempting websocket connection...");
let (ws_stream, response) = connect_async(req).await.unwrap();
tracing::info!(
"Connected! Response status: {:?}",
response.status()
);
let mut ws_transport = jetstream::websocket::WebSocketTransport::<
RadarChannel,
>::from(ws_stream);
let mut client = radar_protocol::RadarChannel {
inner: Box::new(&mut ws_transport),
};
tracing::info!("Sending ping...");
let now = Instant::now();
match tokio::time::timeout(
std::time::Duration::from_secs(5),
client.ping(),
)
.await
{
Ok(Ok(_)) => {
println!("pong {}ms", now.elapsed().as_millis());
}
Ok(Err(e)) => tracing::error!("Ping failed: {:?}", e),
Err(_) => tracing::error!("Ping timed out after 5 seconds"),
}
tracing::info!("Client completed");
}
};
}