Hey! I recorded two video courses!
If you like this article, you will also like the courses! Check them out here!
Welcome back to another blog post :) Before we begin, one question:
Why do cows have hooves instead of feet?
Because they lactose!
Alright, now let’s dive into today’s topic: How to listen to database changes using Postgres triggers and Elixir
🔗 Why listen at all?
In almost all codebases, you have subsequent processes depending on previous processes. Imagine a sign-up flow for new users. After a user signs up, you might want to start sending marketing emails periodically. This is called a downstream process
. The sign-up to the marketing emails depends on the user’s registration. When your software is still simple enough, you usually get away with calling these processes in sequence. Your code might look something like this:
def register_user(params) do
with {:ok, user} <- Repo.create_user(params),
:ok <- sign_up_for_marketing_emails(user) do
:ok
end
end
In this simple case, you only have one downstream
process: the sign-up for the marketing emails. But what if your software becomes more complex and you add more and more such downstream processes? Your register_user/1
function will grow and grow. Maybe your company grows as well and adds more dev teams. That’s great, but now you have more teams adding their own downstream calls to register_user/1
, making them all dependent on each other.
Imagine that one of the many calls fails. Usually, the entire user registration will roll back, undoing the changes in every team that placed their downstream call before the failed one. In the worst case, you might have an incomplete registration, where some calls succeed and others fail and you didn’t roll back properly. Now, the teams with failed calls need to somehow fetch the missing user data and reconcile it with everything that might have happened after the registration. It’s simply a mess.
A frequent solution to untangle these dependencies is to implement an event system. Phoenix.PubSub is such a system for example. Event systems have many advantages (and disadvantages), but in this case, they help us to reverse the code dependency between upstream and downstream processes. Without an event system, register_user/1
has to call sign_up_for_marketing_emails/1
directly. register_user/1
is aware of the downstream process and it needs to change if the downstream process changes, for example when we need to provide more info, like which marketing emails the user subscribed to and which ones they didn’t. Our upstream code, register_user/1
, depends on the downstream process, sign_up_for_marketing_emails/1
.
Now, with an event system, we can reverse that dependency. Our register_user/1
call could simply broadcast an event using PubSub.broadcast/4
. Our downstream process can then subscribe and react to the events. Now, our upstream code is unaware of the downstream process. It doesn’t care whether it succeeds or fails or if it changes. Instead, the downstream code now depends on register_user/1
to publish the necessary data. That’s it. If we want to change the marketing email sign-up, we simply handle the received data differently in sign_up_for_marketing_emails/1
. No need to change register_user/1
. Perfect!
🔗 How to listen?
I mentioned Phoenix.PubSub
before and if I’m implementing an Elixir application, that would be my go-to event system if I need to send and react to basic notifications. If I need more advanced systems, Kafka
, RabbitMQ
, or Amazon's SQS
would come to mind. But there’s another, low-effort solution that you probably already have in your tech stack: Postgres
.
Postgres
offers quick and simple Notifications that can help you react to changes in your database without much overhead. They are particularly interesting if you can’t use Phoenix’s PubSub
, for example, if another non-Elixir application also makes changes to your database.
Let’s see how you can set up Postgres notifications in your Phoenix app.
🔗 The Migration
The first step is to add the notifications using Postgres SQL. The migration below adds a trigger to a table called appointments
that sends a notification whenever an appointment is canceled or uncanceled. Let’s have a look:
defmodule Demo.Repo.Migrations.AddAppointmentCanceledTrigger do
use Ecto.Migration
def up do
execute """
CREATE OR REPLACE FUNCTION notify_canceled_changed()
RETURNS trigger AS $trigger$
DECLARE
payload TEXT;
BEGIN
IF (TG_OP = 'UPDATE') AND (OLD.canceled != NEW.canceled) THEN
payload := json_build_object('id',OLD.id,'old',row_to_json(OLD),'new',row_to_json(NEW));
PERFORM pg_notify('appointments_canceled_changed', payload);
END IF;
RETURN NEW;
END;
$trigger$ LANGUAGE plpgsql;
"""
execute """
CREATE TRIGGER appointments_canceled_changed_trigger
AFTER UPDATE ON appointments FOR EACH ROW
WHEN ( OLD.canceled IS DISTINCT FROM NEW.canceled )
EXECUTE PROCEDURE notify_canceled_changed();
"""
end
def down do
execute """
DROP TRIGGER appointments_canceled_changed_trigger ON appointments;
"""
execute """
DROP FUNCTION notify_canceled_changed();
"""
end
end
That’s a lot of SQL, but in short, the migration first adds a function called notify_canceled_changed()
which sends a notification to the channel appoiments_canceled_changed
with the id
of the appointment and the before and after state of the appointment encoded as JSON. We add the function as a trigger to the appointments
table using CREATE TRIGGER ...
. The function is only executed if the appointment is updated
, so not inserted
or deleted
, and only if the appointment.canceled
boolean field has changed.
In our notify_canceled_changed
function, we double-check these validations as a precaution (i.e. the record is updated and canceled
has changed). We wouldn’t need to though since Postgres will execute this function only if these requirements are met. But it never hurts to double-check!
You might have noticed that we don’t use an EVENT TRIGGER
here. The difference between a TRIGGER
and an EVENT TRIGGER
is that the latter handles database-wide events (e.g. CREATE TABLE
) whereas the former handles events specific to a given table (e.g. INSERT|UPDATE|READ|DELETE| row
). We are only interested in changes to the appointments
table which is why we use a simple TRIGGER
.
Another detail you might have spotted is that we execute the trigger after a row was updated with AFTER UPDATE ON
. For our use-case, the distinction between BEFORE
and AFTER
update doesn’t matter, but if you wanted to change the new data before it is written to the row, you would need to use the BEFORE UPDATE
statement. Read this for more details on the difference between the two.
If you want more examples of triggers, here are some.
Now with the migration in place, run mix ecto.migrate
to add the function and trigger to your database. However, if you run your application now and update an appointment, you won’t see anything. That’s because we’re not yet listening to such events from our Elixir application. Let’s change that.
🔗 The Listener
We will use the Postgrex.Notifications module to listen to and handle messages from Postgres. Here’s our listener:
defmodule Demo.DatabaseListener do
use GenServer
@channel "appointments_canceled_changed"
def start_link(init_args) do
GenServer.start_link(__MODULE__, [init_args], name: __MODULE__)
end
def init(_args) do
repo_config = Demo.Repo.config()
{:ok, pid} = Postgrex.Notifications.start_link(repo_config)
{:ok, ref} = Postgrex.Notifications.listen(pid, @channel)
{:ok, {pid, ref}}
end
def handle_info({:notification, _pid, _ref, @channel, payload}, state) do
payload = Jason.decode!(payload)
IO.puts("Received a notification for appointment #{payload["id"]}")
IO.puts(payload)
{:noreply, state}
end
end
We created a GenServer
for our listener with its own message queue to handle incoming Postgres messages. Be aware that this listener can easily become a bottleneck if you have lots of messages. If you can’t handle the messages quickly enough, the message queue will fill up and crash your application. If you’re worried about this case, you could create one listener per channel or use a PartitionSupervisor to start more handlers and spread out the work.
Our listener is pretty straightforward. First, we spawn another process that connects to Postgres using our Repo config. Then, we start listening to that process which means that we’ll receive any Postgres messages it might catch. Whenever a notification comes in, our handle_info/2
callback will handle it. In this case, we simply decode its JSON payload and log it to the console.
The last step is to start our listener whenever our application starts. Open your lib/application.ex
and add the listener to the supervisor’s children, like this:
# lib/application.ex
def start(_type, _args) do
children = [
# Other modules
Demo.Repo,
# Make sure to start the listener AFTER your Repo.
# Otherwise, your listener won't connect.
Demo.DatabaseListener
]
opts = [strategy: :one_for_one, name: Demo.Supervisor]
Supervisor.start_link(children, opts)
end
And that’s it! Now, whenever we start our application, we establish a link to Postgres and listen to messages on the given channel. If such a message comes in, our handle_info/2
will log it. Let’s try it out!
🔗 The Test
Now with everything in place, we can test-run the setup. Let’s create and update an appointment:
iex> {:ok, a} = Demo.Appointments.create_appointment(%{canceled: false})
{:ok, %Appointment{...}}
iex> {:ok, a} = Demo.Appointments.update_appointment(a, %{canceled: true})
{:ok, %Appointment{...}}
"Received a notification for appointment 1"
%{
"id" => 1,
"old" => %{
"id" => 1,
"canceled" => false,
"inserted_at" => "2023-02-11T17:36:56",
"updated_at" => "2023-02-11T17:37:03"
},
"new" => %{
"id" => 1,
"canceled" => true,
"inserted_at" => "2023-02-11T17:36:56",
"updated_at" => "2023-02-11T17:44:48"
}
}
After updating the canceled
-flag of our appointment, we received a message from Postgres! It works! Yey!
🔗 Conclusion
And that’s it! I hope you enjoyed this article! If you have questions or comments, let’s discuss them on Twitter. Follow me on Twitter or subscribe to my newsletter below if you want to get notified when I publish the next blog post. Until the next time! Cheerio 👋