When I was starting out in Rust development in 2023, I wanted to do something ambitious that would help me learn the language. This was, in part due to the complexity of my tasking at work, but it was also due to my love for the language - what it stood for and why it was created.
One of the best ways to get familiar with a programming language is to create a project in it. You rarely learn without being challenged, and a project helps you build familiarity with the language's quirks and features. Given I know a bit of networking (otherwise, this website wouldn't be up!), I thought to myself, why not build an HTTP server in Rust, without additional tooling? Building an HTTP server in a new language teaches you a few things:
So, I went to work, and I was able to get a HTTP server working that served basic files. But now came the big question, now what?, and a few months ago, I answered this question.
Typically, HTTP only allows for one way - you request a server asset, the server replies, and that connection is closed. Web sockets are a live, persistent connection between a client and a server, meaning the server can stream data back to the client whenever it wants to. This is useful for real-time data - weather events, financial data, and chat programs. Without web sockets, you would need to refresh the web page every time to see updates, which is tedious.
Web sockets are really simple on the backend. Like an HTTP request, they operate on the same channels, so an initial request is made, and my Rust backend takes the socket. Rather than sending back data and closing the socket, my backend replies "hey! I got your request to start a keep alive connection", then... keeps the session alive in an infinite loop.
While this session is alive in an infinite loop, I needed a way to figure out how to do two things:
In languages like python or C++, this would be easy, I would have a global state controlled by Mutexes, and I would add each web socket session to this global state. With Rust however, this wasn't going to work. Global state in Rust is near impossible to implement correctly, so I needed to find a different approach. The approach I used was channels.
Channels are a construct Rust borrows (pun intended) from Golang. One of the popular slogans used by the language is "don't communicate by sharing, share by communicating". This means that in order for a global state to exist, there needs to be some dedicated workflow that manages this state, and a way for other workflows to communicate. By doing this, the workflow managing the state is the only workflow managing the state, with commands to modify that state being sent by the channels communicating with that workflow. In my web socket implementation, I used the tokio::sync::mpsc channel, as the dedicated state managing my web socket sessions would be the single consumers, and the contexts these web sockets work in would be sending commands to the dedicated state. Here is a snippet better explaining this concept:
// Webs socket stuff
let (tx, mut rx): (
UnboundedSender<WebSocketCommands>,
UnboundedReceiver<WebSocketCommands>,
) = mpsc::unbounded_channel();
// Web socket thread. This socket stores uuids as keys and a channel to contact each active session as values.
tokio::spawn(async move {
let mut socket_session: WebSockets = WebSockets::default();
while let Some(command) = rx.recv().await {
match command {
WebSocketCommands::WebSocketAddConn(uuid, sender) => {
socket_session.add_session(uuid, sender).await
}
WebSocketCommands::WebSocketRemoveConn(uuid) => {
socket_session.remove_session(uuid).await
}
WebSocketCommands::WebSocketBroadcast(items) => {
socket_session.broadcast(items).await
}
WebSocketCommands::WebSocketBroadcastExcept(uuid, items) => {
socket_session.broadcast_except(uuid, items).await
}
}
}
});
In the above snippet, I create a transmitter and receiver, and pass the receiver to the web socket thread. I have 4 commands - add a web socket, remove a web socket, broadcast a payload to all connected sockets, and broadcast a payload to all sockets except the one identified by the uuid. Internally, this state management is simple - I used a HashMap to look up a socket by a uuid key. I create these uuid keys when I set up the web socket sessions, something I will showcase below:
pub async fn handle_web_sockets(
request: HTTPRequest,
mut tcp_handler: BufReader<TcpStream>,
sender: UnboundedSender<WebSocketCommands>,
) {
// Get the key and fail if it does not exist
let key = match request.0.web_socket_key {
Some(x) => x,
None => {
tcp_handler
.write_all(&HTTPResponses::not_found().to_response())
.await
.unwrap();
return;
}
};
let response = WebSocketHandShake::handshake(&key);
// Send a response to the client saying we got the web socket
tcp_handler
.write_all(&response.to_response())
.await
.unwrap();
let uuid = uuid::Uuid::new_v4();
// Web socket stuff
let (tx, mut rx): (UnboundedSender<Vec<u8>>, UnboundedReceiver<Vec<u8>>) = unbounded_channel();
...
As you can see, I create a uuid that I associate the web socket session with, so I don't have to pass the socket back and forth to the web socket thread, I just have to pass the uuid. How I got to this point was... interesting and required a lot of thinking. I originally wanted to send the whole TCP socket (BufReader<TCPStream>) over to the web socket thread and still maintain ownership in the handle_web_socket function. However, this approach required I wrap this BufReader in a Arc<Mutex<>>, which added a lot of overhead. Furthermore, this approach didn't work, I needed to read from the BufReader in the handle_web_sockets function, and also write to it in the dedicated thread, the messages I would send become delayed. This was because I was awaiting the BufReader in the handle_web_socket function, but the dedicated web socket thread was also awaiting on sending information, possibly causing a conflict. Here is what I mean:
Type and send "aaaaaa" => does nothing
Type and send "bbbbbb" => displays "aaaaaa"
See how the messages above are staggered? This wasn't the desired result for me, I wanted to see "aaaaaa" displayed the moment I hit enter. I also wanted other sessions to see it, something that wasn't working either. Back to the drawing board!
The second approach I had was to relinquish control over the BufReader<Mutex<>> completely. I still needed the Mutex wrapper as I would potentially read/write to the socket concurrently, but since I wasn't maintaining shared ownership, I could discard the Arc wrapper. Since I still had to read from this socket in handle_web_socket, I had to add a command to the dedicated web socket thread that could read from the socket. I would send a command to the web socket thread and ask it read the data from the socket identified by the uuid I created for that session.
This approach was better than the last one and cleaner, but it still wasn't working properly. During testing, only the first session that interacted with the socket server would work, the other sessions wouldn't be able to connect. Now... I'm not 100% confident what the issue was here, but I'm guessing the first session was flooding read requests to the web socket thread, so it wasn't able to get to the other sessions.
The third approach was to keep the BufReader<Mutex<>> in the handle_web_socket function, but create a function scoped UnboundedReceiver<> and UnboundedSender<> that allows the dedicated web socket thread to communicate back to all threads in the handle_web_socket function. The web socket thread would then send a payload of bytes through the UnboundedSender<> instead of writing to a BufReader<TCPStream> directly.
Using this approach, I didn't have to deal with mutexes, but I was still facing issues with getting the messages to display real time. Again, not 100% confident in the core problem, but it most likely had to do with awaiting reading/writing to the socket, potentially concurrently and also other asynchronous black magic stuff in Rust. I tried a couple different approaches around this, but ultimately settled on Tokio's select macro, which would listen for a broadcast and listen for incoming data at the same time, picking either operation if there's data to be broadcasted or read and cancelling the other one. And this made me realize, that await points are sequential, so without select I would pause on waiting for a broadcast or waiting to read from the socket, when I wanted to do whatever operation came first.
This was a lot to explain, and even now I'm still learning why the solution I arrived at works, while the other approaches I tried did not. Feel free to take a look at my GitLab and email me if you have any specific questions!
So I ran into a few problems when my site was deployed. The first problem was the absence of connect/disconnect messages.
I wanted to send other people using this service a notification if a user connected/disconnected (emphasis on other people and not the person who connected). This was a simple addition, as immediately after I register the socket connection to the dedicated socket thread, I can send a command to the dedicated thread to broadcast a connection message to every other session except the session that connected. Similar to on disconnect, after the loop terminates (the web socket frame's OP code is 8, which means termination), I send a command to the dedicated web socket thread to broadcast disconnection message to every other session except the session that disconnected.
The other problem I was running into was keeping the socket alive. I didn't realize this before, but apparently if you leave a socket inactive, it auto closes, so after a while of not doing anything on the frontend, it would stop working. To solve this, I implemented a "ping/pong" system, where I would send a dummy message to the socket server every 20 seconds (using setInterval), and on the server I would send an empty message back.
For more information, the merge request has the code I changed on the server side to accomplish this. Really not a lot was added in hindsight, just sending some extra commands here and there and parsing different message types.