Hey! I recorded two video courses!

My two video courses are called: Build an MVP with Elixir and Building Forms with Phoenix LiveView
If you like this article, you will also like the courses! Check them out here!

Hey folks, today’s blog post is slightly longer than usual, but it’ll be worth it. To get you started, let me ask you:

Did you hear about the Italian chef who died?

He pasta-way

Alrighty, let’s dig in!

🔗 The Problem

A few weeks back, I watched Chris McCord live-code a Twitch clone during the Elixir Stream Week. He implemented an architecture which received the video stream of a creator on one server and broadcasted it to all other servers in the cluster using PubSub. This way, viewers could connect to any server around the globe and they would see a playback of the creator’s stream. Neat, right?

Chris's architecture which shows a creator recording a stream to a server in the US, which then sends the stream in packages to servers in the EU, Asia, and LATAM, and groups of viewers which connect to the servers in the EU, Asia, or LATAM.

However, during the presentation I wondered whether it wouldn’t be more efficient to connect all viewers to the server of the creator, which would remove the need to replicate the stream to every server on the network. I asked Chris and he agreed that this would be interesting, however he didn’t have the time to get into it.

So, here’s a blog post about how you can use Fly’s fly-replay header to connect users from around the globe to a single Fly machine, for example to the machine your creator streams to, using Phoenix and Phoenix LiveView. The post also highlights how you can keep the user connected to the server using sticky sessions.

🔗 The Setup

I created a small app (GitHub) and deployed it to the Dallas (DFW) and Amsterdam (AMS) regions on Fly.io. I ran two machines in Dallas and one in Amsterdam. Since I’m closest to Amsterdam, Fly would route all my requests to the Amsterdam machine by default. My goal was to change the machine to which Fly routes my request and WebSocket connection using the fly-replay header.

Here’s a demo of the app:

A demo of the chat app. It shows a textarea that receives a new text message every second and offers three tabs to change between streams

The app offered three tabs which simulated three creator streams. The user could jump between streams by clicking on them. The left stream ran on the Amsterdam server, whereas the middle and right streams ran on one of the two Dallas servers. Whenever the user clicked on a stream, they would be rerouted to the correct machine using the fly-replay header and establish a WebSocket connection with the same machine that the creator was also “streaming” to.

🔗 The Plug

It took me a few attempts, but eventually I got it working using this Plug:

# lib/chat_web/plugs/redirect_to_machine.ex

defmodule ChatWeb.Plugs.RedirectToMachine do
  use Plug.Builder

  import Plug.Conn

  require Logger

  @cookie_key "fly-machine-id"
  @cookie_ttl 6 * 24 * 60 * 60 * 1000

  def call(conn, opts) do
    conn
    |> fetch_query_params()
    |> fetch_cookies()
    |> handle_conn(opts)
  end

  def handle_conn(%Plug.Conn{params: params} = conn, _opts) do
    machine_id = Application.get_env(:chat, :fly_machine_id)
    param_id = Map.get(params, "instance")
    cookie_id = Map.get(conn.req_cookies, @cookie_key, machine_id)

    cond do
      param_id && param_id == machine_id ->
        Logger.info("Correct machine based on parameter #{param_id}. Set cookie and let pass.")
        put_resp_cookie(conn, @cookie_key, machine_id, max_age: @cookie_ttl)

      param_id && param_id != machine_id ->
        Logger.info("Incorrect machine #{machine_id} based on parameter #{param_id}. Redirect.")
        redirect_to_machine(conn, param_id)

      cookie_id && cookie_id == machine_id ->
        Logger.info("Correct machine based on cookie #{cookie_id}. Let pass.")
        conn

      cookie_id && cookie_id != machine_id ->
        Logger.info("Incorrect machine #{machine_id} based on cookie #{cookie_id}. Redirect.")
        redirect_to_machine(conn, cookie_id)

      true ->
        Logger.info("No parameter or cookie. Let pass.")
        conn
    end
  end

  defp redirect_to_machine(conn, requested_machine) do
    conn
    |> put_resp_header("fly-replay", "instance=#{requested_machine}")
    |> put_status(307)
    |> Phoenix.Controller.text("redirecting...")
    |> halt()
  end
end

In essence, the plug above decides whether to redirect the request based on three variables:

  1. The current machine ID (machine_id)
  2. The machine ID requested as a parameter (param_id)
  3. The machine ID set in the cookie (cookie_id)

I created the decision tree below to visualize the decision making process of the Plug. Have a look:

The decision flow for redirecting a request based on its parameter or cookie.

As an example, if the request has a parameter and the parameter machine ID is different from the current machine ID, it would redirect the request to the parameter machine ID using the fly-replay header.

An important detail is that I store the current machine ID in a cookie when the request reaches the correct machine. This is important because only the initial HTTP request of the user contains the machine ID parameter, and the subsequent WebSocket connection request does not, but it does contain the cookie. So, to reroute the WebSocket connection as well, I needed the cookie.

By setting the cookie, we ensure that all future requests, static HTTP or WebSocket upgrades, are redirected to the same machine. This makes our session “stick” to a single machine, hence the name “sticky session”.

🔗 The Endpoint

Using the plug was a bit tricky as well. If you add the plug to your endpoint as a regular plug, it won’t intercept the WebSocket connection requests because these don’t go through the regular endpoint. Thanks to Chris for pointing this out in the Elixir Slack.

Instead, you need to override the call/2 callback of your Endpoint like this:

# lib/chat_web/endpoint.ex

defmodule ChatWeb.Endpoint do
  use Phoenix.Endpoint, otp_app: :chat

  def call(conn, opts) do
    case ChatWeb.Plugs.RedirectToMachine.call(conn, opts) do
      %Plug.Conn{halted: true} = conn -> conn
      conn -> super(conn, opts)
    end
  end

  # ...
end

The code above will intercept all requests going through your endpoint, including WebSocket connection requests. We use our plug to decide whether to let them pass or redirect them.

If we redirect, we include the fly-replay: instance=#{machine_id} header and respond with a 307 Temporary Redirect so that the Fly proxy can redirect the request to the correct machine. If we allow the request to pass, we call the default Endpoint.call/2 callback using super(conn, opts), which continues with the regular endpoint execution.

🔗 The Environment

The next step of the setup is to fetch the machine ID from the environment variables. Fly adds a number of environment variables to your machine. This is my runtime.exs:

# config/runtime.exs

config :chat,
  fly_region: System.get_env("FLY_REGION", "blank"),
  fly_machine_id: System.get_env("FLY_MACHINE_ID", "blank"),
  fly_vm_memory_mb: System.get_env("FLY_VM_MEMORY_MB", "blank")

🔗 Discover Machines

The final step of the setup is to track which “creators” are currently online and to which machine they are streaming. The exact implementation of this discovery depends on your use case, so I’ll just link to the implementation I used for my demo project.

In essence, upon startup and every 10 seconds thereafter, I send a discovery message through PubSub from the machine to all other machines in the cluster, and the other machines reply with their machine ID and which creator they host. This way, I maintain a list of all machine IDs and their creators on each machine.

This implementation works for my demo project, but in a real-world application, I would prefer to send a PubSub message whenever a creator starts or stops streaming. This eliminates the need to rediscover the same machines every 10 seconds.

🔗 Putting it Together

With this setup in place, we can redirect a user to a specific machine simply by passing the machine ID as a parameter. Neat, right?

{:noreply, redirect(socket, to: ~p"/?#{instance: machine_id}")}

If we look at the logs, we can see how the HTTP request and WebSocket connection get redirected from the Amsterdam server to the same machine in Dallas:

# Static HTTP Request
ams [info] Incorrect machine d8dd626f379508 based on parameter 286533db064e58. Redirect.
dfw [info] Correct machine based on parmeter 286533db064e58. Set cookie and let pass.
dfw [info] GET /
dfw [info] Sent 200 in 1ms

# WebSocket Connection Request
ams [info] Incorrect machine d8dd626f379508 based on cookie 286533db064e58. Redirect.
dfw [info] Correct machine 286533db064e58 and cookie. Let pass.
dfw [info] CONNECTED TO Phoenix.LiveView.Socket in 34µs
dfw [info]  Transport: :websocket
dfw [info]  Serializer: Phoenix.Socket.V2.JSONSerializer
dfw [info]  Parameters: %{"_csrf_token" => "..."}

🔗 Conclusion

And that’s it! I hope you enjoyed this article! If you want to support me, you can buy my firewall for Phoenix Phx2Ban or my book or video courses (one and two). Follow me on Bluesky or subscribe to my newsletter below if you want to get notified when I publish the next blog post. Until next time! Cheerio 👋

Stay updated about my work