Description

This example shows how to use a WebSocket to show the live status of the Shuttle API on a web page. The app also provides an echo service and notifies when the number of connected users change.

You can clone the example below by running the following (you’ll need cargo-shuttle installed):

cargo shuttle init --from shuttle-hq/shuttle-examples \
 --subfolder actix-web/websocket-actorless

Code

use actix_files::NamedFile;
use actix_web::{
    web::{self, ServiceConfig},
    HttpRequest, HttpResponse, Responder,
};
use actix_ws::Message;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use serde::Serialize;
use shuttle_actix_web::ShuttleActixWeb;
use std::{
    sync::{atomic::AtomicUsize, Arc},
    time::Duration,
};
use tokio::sync::{mpsc, watch};

const PAUSE_SECS: u64 = 15;
const STATUS_URI: &str = "https://api.shuttle.rs";

type AppState = (
    mpsc::UnboundedSender<WsState>,
    watch::Receiver<ApiStateMessage>,
);

#[derive(Debug, Clone)]
enum WsState {
    Connected,
    Disconnected,
}

#[derive(Serialize, Default, Clone, Debug)]
struct ApiStateMessage {
    client_count: usize,
    origin: String,
    date_time: DateTime<Utc>,
    is_up: bool,
}

async fn echo_handler(
    mut session: actix_ws::Session,
    mut msg_stream: actix_ws::MessageStream,
    tx: mpsc::UnboundedSender<WsState>,
) {
    while let Some(Ok(msg)) = msg_stream.next().await {
        match msg {
            Message::Ping(bytes) => {
                if session.pong(&bytes).await.is_err() {
                    return;
                }
            }
            Message::Text(s) => {
                session.text(s.clone()).await.unwrap();
                tracing::info!("Got text, {}", s);
            }
            _ => break,
        }
    }

    if let Err(e) = tx.send(WsState::Disconnected) {
        tracing::error!("Failed to send disconnected state: {e:?}");
    }

    let _ = session.close(None).await;
}

async fn websocket(
    req: HttpRequest,
    body: web::Payload,
    app_state: web::Data<AppState>,
) -> actix_web::Result<HttpResponse> {
    let app_state = app_state.into_inner();
    let (response, session, msg_stream) = actix_ws::handle(&req, body)?;

    let tx_ws_state = app_state.0.clone();
    let tx_ws_state2 = tx_ws_state.clone();

    // send connected state
    if let Err(e) = tx_ws_state.send(WsState::Connected) {
        tracing::error!("Failed to send connected state: {e:?}");
    }

    // listen for api state changes
    let mut session_clone = session.clone();
    let mut rx_api_state = app_state.1.clone();
    actix_web::rt::spawn(async move {
        // adding some delay to avoid getting the first message too soon.
        tokio::time::sleep(Duration::from_millis(500)).await;
        while rx_api_state.changed().await.is_ok() {
            let msg = rx_api_state.borrow().clone();
            tracing::info!("Handling ApiStateMessage: {msg:?}");
            let msg = serde_json::to_string(&msg).unwrap();
            session_clone.text(msg).await.unwrap();
        }
    });

    // echo handler
    actix_web::rt::spawn(echo_handler(session, msg_stream, tx_ws_state2));
    Ok(response)
}

async fn index() -> impl Responder {
    NamedFile::open_async("./static/index.html")
        .await
        .map_err(|e| actix_web::error::ErrorInternalServerError(e))
}

#[shuttle_runtime::main]
async fn actix_web(
) -> ShuttleActixWeb<impl FnOnce(&mut ServiceConfig) + Send + Clone + 'static> {
    // We're going to use channels to communicate between threads.
    // api state channel
    let (tx_api_state, rx_api_state) = watch::channel(ApiStateMessage::default());
    // websocket state channel
    let (tx_ws_state, mut rx_ws_state) = mpsc::unbounded_channel::<WsState>();

    // create a shared state for the client counter
    let client_count = Arc::new(AtomicUsize::new(0));
    let client_count2 = client_count.clone();

    // share tx_api_state
    let shared_tx_api_state = Arc::new(tx_api_state);
    let shared_tx_api_state2 = shared_tx_api_state.clone();

    // share reqwest client
    let client = reqwest::Client::default();
    let client2 = client.clone();

    // Spawn a thread to continually check the status of the api
    tokio::spawn(async move {
        let duration = Duration::from_secs(PAUSE_SECS);

        loop {
            tokio::time::sleep(duration).await;
            let is_up = get_api_status(&client).await;

            let response = ApiStateMessage {
                client_count: client_count.load(std::sync::atomic::Ordering::SeqCst),
                origin: "api_update loop".to_string(),
                date_time: Utc::now(),
                is_up,
            };

            if shared_tx_api_state.send(response).is_err() {
                tracing::error!("Failed to send api state from checker thread");
                break;
            }
        }
    });

    // spawn a thread to continuously check the status of the websocket connections
    tokio::spawn(async move {
        while let Some(state) = rx_ws_state.recv().await {
            match state {
                WsState::Connected => {
                    tracing::info!("Client connected");
                    client_count2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
                }
                WsState::Disconnected => {
                    tracing::info!("Client disconnected");
                    client_count2.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
                }
            }

            let client_count = client_count2.load(std::sync::atomic::Ordering::SeqCst);
            tracing::info!("Client count: {client_count}");

            let is_up = get_api_status(&client2).await;

            if let Err(e) = shared_tx_api_state2.send(ApiStateMessage {
                client_count,
                origin: format!("ws_update"),
                date_time: Utc::now(),
                is_up,
            }) {
                tracing::error!("Failed to send api state: {e:?}");
            }
        }
    });

    let app_state = web::Data::new((tx_ws_state, rx_api_state));

    let config = move |cfg: &mut ServiceConfig| {
        cfg.service(web::resource("/").route(web::get().to(index)))
            .service(
                web::resource("/ws")
                    .app_data(app_state)
                    .route(web::get().to(websocket)),
            );
    };
    Ok(config.into())
}

async fn get_api_status(client: &reqwest::Client) -> bool {
    let response = client.get(STATUS_URI).send().await;
    response.is_ok()
}

Usage

Once you’ve cloned the example, launch it locally using cargo shuttle run and then go to http://localhost:8000. You should be able to see a status page and if you go to your Inspect/Chrome Devtools (depending on what browser you’re using), if you go to the Network tab you’ll see that your browser received a HTTP status code of 101.


If you want to explore other frameworks, we have more examples with popular ones like Tower and Warp. You can find them right here.

Be sure to check out the examples repo for many more examples!