Listen to Database Changes through the Postgres WAL

- 28 min read

Welcome back, friends!

Did you hear the news about the inventor of the Knock-Knock joke?
He got a No-bell prize.

Alright, this post will be a bit longer since we'll dive deeply into the Postgres WAL and will explore how to listen for database changes through it.

The Problem

Let's say you want to react to changes in your Postgres database, for example to send out realtime notifications or to track the changes in an audit log. You consult Google (or let's be honest: your favorite LLM) and learn about NOTIFY/pg_notify and triggers.

NOTIFY/pg_notify

You decide to try pg_notify on a small, not so active database table. It works! Great.

You roll it out to a larger, much more active database table. It works, but suddenly your query times explode! Instead of doing 1 million transactions per second* you can now do only 3 (*These numbers were exaggerated for dramatic effect)

So, what happened?

The problem with Postgres' NOTIFY is that all notifications go through a single queue!

Even if you have 20 database connections making 20 transactions in parallel, all of them need to wait for their turn to lock the notification queue, add their notification, and unlock the queue again. This creates a bottleneck especially in high-throughput databases.

pg_notify also has other limitations:

  • Its notification size is limited to 8000 bytes. This might not be enough to send all the data in the notification and you'll need to query the database for the full dataset which adds additional load to your database.
  • It does not wait for an acknowledgment of the listener and does not retry messages which means if your listener is dead or can't receive the message, the notification is lost.
  • If you call pg_notify or NOTIFY inside a trigger, it will get called 100,000 times and send out 100,000 notifications if you change 100,000 rows in a single transaction which from a performance perspective is ... not ideal.

You realize that pg_notify doesn't work for you. But there must be another way, right?

There is and it's called the Postgres Write-Ahead Log (WAL).

The WAL 🐳

In short, the WAL is a log of every change to your database. Every transaction must save its changes to both the WAL and the data pages, or it will be rolled back.

The WAL's main benefit is that it lets Postgres delay writing data pages to disk after a transaction finishes. Without the WAL, Postgres would have to save all changes to disk right after a transaction completes. If it didn't and the system crashed, the changes would be lost.

With the WAL, Postgres only needs to write one file to disk (the WAL) and can wait to write the data pages later. If a crash happens, it can recreate the data pages from their latest version on disk and the changes tracked in the WAL. This significantly reduces the writes to disk, which are slow compared to changing a row in memory, and it allows Postgres to support much higher transaction throughputs.

The WAL allows Postgres to support a range of other functionalities as well like:

  • Database replication - replaying the WAL's changes on a follower database is the most common method for creating database replicas
  • Online/streamed backups - following the WAL means you can have database backups that are always up-to-date. Without the WAL, you'd need to take snapshots at a high frequency and would still miss the latest changes!
  • Point-in-time recovery (PITR) - did you ever wonder how a database provider can rebuild your database at a specific point in time? By recording the WAL and rebuilding your database based on its changes up to that time!
  • Change Data Capture (CDC) - the WAL enables listening to database changes in real-time, which is what this post is all about! You can use it to send notifications and maintain audit logs, all without having to change your application code.

Now, let's see how we can listen to the WAL using Elixir.

Previous Art

The following code was heavily inspired by this exploration, written by my friend Filipe Cabaço, who nerd-sniped me into this topic, which caused me to write about it, which eventually forced you to read about it! So, now you know who to blame if you're unhappy with this post ;-)

Also, a warning that the following code wasn't tested in production. See Supabase's Realtime repo for a production-ready solution (and blame Filipe for any bugs who has written a large chunk of it).

I'll introduce the solution piece-by-piece but you can find the complete and final version on my GitHub

The Database

Creating a listener for the WAL requires a few steps. The first is to configure your Postgres database to store logical WAL records as well. We will do that by setting the wal_level of your database.

The wal_level is similar to a log level and it determines how much information is written to the WAL.

By default, it is set to replica which stores enough information to replicate the database, recover lost state after a crash, and enable point-in-time recovery. It balances overhead with functionality and serves well as a default.

If you don't need the functionality of the replica level, you can set your database to track only minimal records. This level supports crash recovery, but not database replication or point-in-time recovery. It is faster than replica though.

For our use-case, we need the third level, logical. This level stores the same data as replica but also tracks changes in a more high-level format of what changed (e.g. "a row with these values was inserted into this table") rather than how it changed (e.g. "these bytes were written to this page"). It can lead to significantly more overhead, especially if you configured tables with REPLICA IDENTITY FULL or make many UPDATE or DELETE calls, so keep an eye on your database usage levels after changing the wal_level.


Side-note

If you read up on the WAL, you might encounter the topic of physical vs logical database replication. These terms are related to the wal_level but are not exactly the same.

Physical replication means to replicate a database byte-by-byte based on the replica WAL records. These records track low-level operations like removing stale objects (e.g. VACUUM) or updating indices (e.g. INSERT_LEAF) as well as the INSERT etc. operations that actually change the data. If you replicate a database "physically", it means that your replica database will perform the exact same actions (e.g. removing old WAL records) as the primary database which gives you as close to a "physical" copy of the original database as possible.

Logical replication means that your replica database only applies the high-level logical WAL records to its state but does its own housekeeping like vacuuming or creating indices. This doesn't give you a perfect clone which is always in exactly the same state as your primary database, but it gives you a copy of the data.

Both replication types have their trade-offs. Physical replication is faster and creates a perfect byte-for-byte copy. However, it requires the replica to run the same Postgres version and architecture as the primary. Logical replication works across different Postgres versions, lets you replicate only specific tables, and allows the replica to have different indexes or even additional tables. But it comes with more overhead and requires the logical wal_level which stores more data in the WAL. It also requires you to run migrations against the replica database yourself since it does not replicate changes to the database structure.


Now, back to the wal_level. Let's first change the wal_level of our database to logical. Open up an editor connection to your database and run:

-- Check the current level
SHOW wal_level;
-- Change the level to 'logical'
ALTER SYSTEM SET wal_level='logical';

You need to restart your database for the configuration to take effect. Once it's restarted, you can check the wal_level with:

SHOW wal_level;

It should now return logical.

CI and Docker Configuration

If you run Postgres using Docker, you need to add a startup command to configure the wal_level:

services:
postgres:
image: postgres:18
env:
...
command: ["postgres", "-c", "wal_level=logical"]

In your CI, you can start the database like this:

- name: "Set PG settings"
run: |
docker exec ${{ job.services.postgres.id }} sh -c 'echo "wal_level=logical" >> /var/lib/postgresql/data/postgresql.conf'
docker restart ${{ job.services.pg.id }}

Code snippets come from the Postgrex.ReplicationConnection documentation

Interlude

Before we continue, I need to tell you about publications and replication slots and about the difference between temporary and persistent replication slots.

Publications

First, a Publication is a way to tell Postgres which tables and operations (inserts, updates, deletes) you want to track. Think of it as an event "topic" which will only broadcast events for the tables and operations you have specified. Here's an example:

CREATE PUBLICATION active_user_publication
FOR TABLE users
-- Filters are available only from Postgres 15+
WHERE (active IS TRUE)
WITH (publish = 'insert,delete');

The publication above will publish only the INSERT, DELETE, BEGIN, and COMMIT events for the active users in the users table.

Here's another example:

CREATE PUBLICATION everything_publication FOR ALL TABLES;

This publication will publish all events (BEGIN, INSERT, UPDATE, DELETE, and COMMIT) for all rows and tables. It might be a lot, so when you add your listener, rather restrict the publication to only what you need. You can always change a publication later with a ALTER PUBLICATION call.

Alright, now you know about publications and how to create them. Next, let's dive into replication slots.

Replication Slots

A Replication Slot is a separate copy of the WAL just for our use-case, which is a WAL listener. It tracks how far the listener has read through the WAL and removes WAL records only once the listener has acknowledged them. By using a separate copy of the WAL, we can keep records that the listener hasn't processed yet, even if they've been deleted from the original WAL. It also lets us resend records if the listener crashes.

Now, if you had a peek at Filipe's or Supabase's implementation above, you might have spotted that they use a temporary replication slot. The reason for this is that replication slots can create serious problems for the database if we don't consume them. They will keep on growing until they fill up all disk space allocated to the WAL because Postgres deletes their WAL records only after we consume them.

A temporary replication slot avoids this issue because Postgres deletes it whenever we lose connection to the database. So, if our listener crashes or we remove the listener from our application, Postgres will automatically clean up the replication slot for us which is great from a not-taking-down-the-database perspective.

The downside of using a temporary replication slot is that we have to keep the listener online at all times if we want to listen to all database changes. You might see the problem with this already. If the listener is offline, it will miss change events which is not great from a not-missing-any-events perspective.

If never missing a database change is important to you, a persistent replication slot might be the better choice. Postgres will always write WAL records to it, even if your listener is offline. When it comes back online, the replication slot will send the WAL records it has missed. If the listener crashes before it can acknowledge the records, the replication slot will resend them when the listener restarts.

Be warned though that persistent replication slots allow only one connection, so you can have only one listener at a time. Also, every implementation I've seen in my research for this post has warned against using a persistent slot, but other than filling up the disk, I have yet to understand what other downsides it brings.

Alright, now with these pros and cons in mind, let's continue. I will show you how to create a persistent replication slot. If you want a temporary one, have a look at Supabase's implementation above.

The Migrations

We need two migrations, one to create the publication and one to create the replication slot. We can't use a single migration for both because we must create the replication slot in its own transaction and Ecto wraps a migration into a single transaction. So, two migrations it is.

First, create the publication migration with:

mix ecto.gen.migration create_wal_publication

In there, create your publication:

defmodule Wal.Repo.Migrations.CreateWalPublication do
use Ecto.Migration
def change do
# Adding this extension is optional but helps with
# debugging WAL messages inside Postgres.
#
# It's not needed to listen to the WAL but we'll use
# it for debugging purposes. It is available from
# Postgres 15 onwards. If you're on an older version
# of Postgres: Why?
execute """
CREATE EXTENSION IF NOT EXISTS pg_walinspect;
""",
"""
DROP EXTENSION IF EXISTS pg_walinspect;
"""
# Define your publication below:
execute """
CREATE PUBLICATION wal_listener_publication FOR ALL TABLES;
""",
"""
DROP PUBLICATION IF EXISTS wal_listener_publication;
"""
end
end

Alright, now let's create the persistent replication slot with:

mix ecto.gen.migration create_wal_replication_slot

And add the following to the new file:

defmodule Wal.Repo.Migrations.CreateWalReplicationSlot do
use Ecto.Migration
def change do
execute """
SELECT pg_create_logical_replication_slot('wal_listener', 'pgoutput', false);
""",
"""
SELECT pg_drop_replication_slot('wal_listener');
"""
end
end

Let's take a look at the arguments of the pg_create_logical_replication_slot/3 call.

The first argument (wal_listener) is the name of your replication slot. Next, the argument (pgoutput) defines the output plugin the replication slot should use to convert the raw WAL record to a readable output format. pgoutput is the default and comes with Postgres but we could also use other plugins like wal2json. The third argument false means that this replication slot is not temporary.

Alright. Now, if you execute these two migrations with mix ecto.migrate, you should see the following records in your database:

> SELECT * FROM pg_publication;
oid pubname pubowner puballtables pubinsert pubupdate pubdelete pubtruncate pubviaroot pubgencols
16409 wal_listener_publication 10 TRUE TRUE TRUE TRUE TRUE FALSE n
> SELECT * FROM pg_replication_slots;
slot_name plugin slot_type datoid database temporary active active_pid xmin catalog_xmin restart_lsn confirmed_flush_lsn wal_status safe_wal_size two_phase two_phase_at inactive_since conflicting invalidation_reason failover synced
wal_listener pgoutput logical 26370 wal_dev FALSE FALSE NULL NULL 142253 4/4FC8CB80 4/4FC8CBB9 reserved NULL FALSE NULL 2025-10-27 14:30:54.98639+00 FALSE NULL FALSE FALSE

If you see an output similar to the one above, your migrations successfully created the publication and replication slot we need to listen to the WAL. So, let's do that next.

The Listener

As with so many things in the BEAM ecosystem, we can build our WAL listener on top of the shoulder of giants, in this case the Postgrex.ReplicationConnection which handles the low-level communication with Postgres for us. Here is the first part of our Listener module:

defmodule Wal.Replication do
use Postgrex.ReplicationConnection
require Logger
def start_link(_opts) do
config = Wal.Repo.config()
# Automatically reconnect if we lose connection.
extra_opts = [
auto_reconnect: true
]
Postgrex.ReplicationConnection.start_link(
__MODULE__,
:ok,
extra_opts ++ config
)
end
@impl Postgrex.ReplicationConnection
def init(:ok) do
{:ok, %{messages: [], relations: %{}}}
end
@impl Postgrex.ReplicationConnection
def handle_connect(state) do
query =
"""
START_REPLICATION SLOT wal_listener
LOGICAL 0/0
(proto_version '1', publication_names 'wal_listener_publication')
"""
Logger.debug(query)
{:stream, query, [], state}
end
end

Now, what happens here?

First, we pull in all the nitty-gritty low-level logic necessary to connect, reconnect, query, and stream from Postgres with just one line:

use Postgrex.ReplicationConnection

Next, we define the start_link/1 function to start the listener. We pull in the already existing database configuration from our Repo with Wal.Repo.config(). This allows us to connect the WAL listener to our application database without having to duplicate the connection configuration but you could also provide your own connection details.

We add the extra option auto_reconnect: true to instruct Postgrex to automatically reconnect our ReplicationConnection. And lastly, we call the start_link/3 function on Postgrex.ReplicationConnection which uses :gen_statem and the Postgrex.Protocol to create a database connection for us.

Now that we defined how to start the listener, we must implement the init/1 callback. It simply returns our initial state with is %{messages: [], relations: %{}}. The messages list will hold the WAL messages we receive from Postgres until we can handle them and the relations map will hold information about the tables in which the WAL changes occurred. But more about these later.

Lastly, we implement one optional callback handle_connect/1. This callback allows us to send a query to Postgres after the database connection was established.

START_REPLICATION SLOT wal_listener
LOGICAL 0/0
(proto_version '1', publication_names 'wal_listener_publication')

With this query, we tell Postgres that it can start sending the logical WAL records from the wal_listener replication slot our way. We tell it to start at the Log Sequence Number (LSN) 0/0, which is the very first record ever recorded. This doesn't mean it will send us all WAL records ever created though.

Our replication slot starts tracking WAL records only after we create it and it doesn't backfill records that happened before. So, unless we create our replication slot before we store a single row, it will miss WAL records, just keep that in mind.

Lastly, we provide the options proto_version and publication_name. The publication name is the publication we've created before. As proto version, we use version 1. You can also use proto version 2, 3, or 4 which are useful if you have very large transactions or two-phase transactions.

On proto version 1, Postgres will wait until very large transactions commit and only then send the WAL records, but in version 2 and 4, it will send the records while the transactions are still in progress, optionally in parallel on version 4. Version 3 supports two-phase commits, which are useful to prepare and commit distributed transactions in multiple databases at once.

If you need these advanced features, make sure to also add the streaming 'on' or 'parallel' option for streaming in-progress transactions and the two_phase TRUE options to the START_REPLICATION query.

Alright, now that we told Postgres to start sending WAL records our way, let's see how we can receive and decode the messages, starting with the keep-alive heartbeat.

Handling Heartbeats

If you look at the docs of START_REPLICATION, you'll see that Postgres will send us two types of messages: XLogData and the Primary Keep-Alive Message. XLogData contains the data of one logical WAL record in binary format. The Primary Keep-Alive Message is a heartbeat from Postgres to our listener and has four purposes:

  1. It checks that our listener is still alive, hence the name. If our listener fails to respond, Postgres will close the connection.
  2. It tells the listener that our Postgres instance is still alive and our replication slot is active.
  3. It provides the LSN of the latest WAL record our replication slot has recorded.
  4. It gives the listener the opportunity to acknowledge the XLogData messages it has received since the last keep-alive message by replying with a Standby Status Update or Hot Standby Feedback message.

Let's implement the last step, which is to receive and to reply to a keep-alive message.

Here's how the listener handles this:

# Primary Keep Alive Message
# https://www.postgresql.org/docs/current/protocol-replication.html#PROTOCOL-REPLICATION-PRIMARY-KEEPALIVE-MESSAGE
@impl Postgrex.ReplicationConnection
def handle_data(<<?k, wal_end::64, _server_time::64, should_reply::8>>, state) do
messages =
case should_reply do
# Standby Status Update
# https://www.postgresql.org/docs/current/protocol-replication.html#PROTOCOL-REPLICATION-STANDBY-STATUS-UPDATE
1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]
0 -> []
end
Logger.debug("Responding to keep alive: #{should_reply} - #{inspect(messages)}")
{:noreply, messages, state}
end

Our handle_data/2 callback receives both the keep-alive and XLogData messages. Here, we match against the keep-alive message which starts with the character k. After the identifier, we receive the latest LSN our replication slot has recorded, the server time, and a one byte flag should_reply which Postgres might set if it wants to know that our listener is still active.

If we should reply, we respond with a Standby Status Update message. This message starts with an r as identifier, followed by the LSN of the last message that we have received and written to disk (first LSN), that we have flushed to disk (second LSN), and that we have applied (third LSN). In this case, we are not too sensitive about losing messages, which is why we simply return the last LSN from the server three times, assuming that we have received and processed all messages it has sent to us successfully.

However, if we wanted to be absolutely sure that we never lose a message, we could track which messages we wrote to a buffer (first LSN), which messages we flushed from the buffer to disk or other persistence method (second LSN) and which messages we applied to our database or other state (third LSN). Postgres will consider a message acknowledged only after we mark it as "applied". We usually don't need this level of detail but it's good to know about.

We respond with the simplified Standby Status Update message, but we could also have sent a Hot Standby Feedback Message. The difference between Standby and Hot Standby is that Hot Standbys run queries against their state and these queries might touch records that have been deleted on the primary but are still needed on the Hot Standby. For example, imagine that your listener receives the INSERT message for a record, adds it to its state, and then runs a query against that state. Now, if Postgres sends us a DELETE message for that record before our query completes, the listener cannot apply that message until the query completes. Otherwise, it would "pull the rug" from underneath the query so to speak.

So, the listener buffers the message and waits for the query to complete. While it is waiting, it would reply to Postgres with a Hot Standby Feedback Message that tells Postgres "Hey, hang on a sec. I first gotta finish this work before I can acknowledge all the new messages you've sent me". Postgres would then keep the WAL records that it has sent until the listener sends a regular Standby Status Update which acknowledges the old WAL records. Again, you probably won't ever need to send a Hot Standby Feedback Message, but it's good to know about.

Decoding Messages

Now that we can receive and respond to keep-alive messages, let's see how our listener can receive and decode the logical WAL records received as XLogData messages.

First, we need to create a second handle_data/2 function which matches against the XLogData format:

# XLogData
# https://www.postgresql.org/docs/current/protocol-replication.html#PROTOCOL-REPLICATION-STANDBY-STATUS-UPDATE
def handle_data(<<?w, raw_lsn::64, _latest_lsn::64, _server_time::64, payload::bytes>>, state) do
payload = Wal.Decoder.parse(payload)
lsn = Wal.Decoder.to_lsn(raw_lsn)
message = %{
lsn: lsn,
type: payload.type,
payload: payload
}
Logger.debug(message)
state = handle_message(message, state)
{:noreply, state}
end
# Will be implemented later
defp handle_message(_message, state), do: state

XLogData messages start with a w as identifier followed by the LSN of the WAL record, the latest WAL record in the replication slot, the server timestamp, and the message payload in bytes. For our use-case, we're only interested in the payload, but we could use the latest LSN to see how far our listener runs behind and we could use the timestamp to calculate the latency with the server.

But for now, let's focus on decoding the message payload. I will show you how to decode a few messages yourself, but for a complete decoder of all possible messages, have a look at Supabase's Realtime decoder or the PgoutputDecoder project.

Now, here's how to decode the BEGIN, INSERT, and COMMIT messages:

defmodule Wal.Decoder do
# Begin
# https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html#PROTOCOL-LOGICALREP-MESSAGE-FORMATS-BEGIN
def parse(<<?B, lsn_end::64, _timestamp::64, tx_id::32>>) do
%{type: :begin, lsn_end: to_lsn(lsn_end), tx_id: tx_id}
end
# Insert
# Without transaction_id because we are not streaming transactions
# https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html#PROTOCOL-LOGICALREP-MESSAGE-FORMATS-INSERT
def parse(<<?I, relation_id::32, ?N, tuple_data::bytes>>) do
%{type: :insert, relation_id: relation_id, data: parse_tuple_data(tuple_data)}
end
# Commit
# https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html#PROTOCOL-LOGICALREP-MESSAGE-FORMATS-COMMIT
def parse(<<?C, _flags::8, lsn::64, lsn_end::64, _timestamp::64>>) do
%{type: :commit, commit_lsn: to_lsn(lsn), tx_end_lsn: to_lsn(lsn_end)}
end
# Relation
# https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html#PROTOCOL-LOGICALREP-MESSAGE-FORMATS-RELATION
def parse(<<?R, relation_id::32, rest::bytes>>) do
# Parses a Relation message
end
defp parse_tuple_data(data) do
# Parses a TupleData struct
end
def to_lsn(lsn_int) do
# Converts an integer LSN to the hex format "XXX/XXX"
end
end

As you can see, every message starts with an identifiying character and otherwise follows the specs in the docs. I only stubbed the parse_tuple_data/1 and to_lsn/1 functions and the Relation parser, since this post is already long enough. Please see my partial Decoder on GitHub for more details on the actual implementation.

Alright, now our listener can receive and decode WAL messages. Let's take it for a spin and see what events we receive for a typical insert transaction.

Receiving Messages

First, we need to start the listener in our Application just after we start the Repo connection:

defmodule Wal.Application do
use Application
@impl true
def start(_type, _args) do
children = [
WalWeb.Telemetry,
Wal.Repo,
Wal.Replication,
# Others
]
end
end

Let's start the application with iex -S mix phx.server and create a test record. I will create a test Meeting record which has a title and a from and until timestamp.

iex> Wal.Meetings.create_meeting(%{
title: "Test Meeting",
from: ~U[2025-01-01 10:00:00Z],
until: ~U[2025-01-01 11:00:00Z]
})
{:ok, meeting}

These are the WAL messages our listener has received:

[type: :begin, payload: %{
type: :begin,
lsn_end: "4/5A7D7E10",
tx_id: 385169
}, lsn: "4/5A7D6730", latest_lsn: "4/5A7D6730"]
[type: :relation, payload: %{
type: :relation, columns: [
%{name: "id", flag: 1, data_type_oid: 20, type_modifier: 4294967295},
%{name: "title", flag: 0, data_type_oid: 1043, type_modifier: 259},
%{name: "from", flag: 0, data_type_oid: 1114, type_modifier: 0},
%{name: "until", flag: 0, data_type_oid: 1114, type_modifier: 0},
%{name: "inserted_at", flag: 0, data_type_oid: 1114, type_modifier: 0},
%{name: "updated_at", flag: 0, data_type_oid: 1114, type_modifier: 0}
],
relation_id: 26378,
namespace: "public",
relation_name: "meetings",
replica_identity_setting: :default
}, lsn: "0/0", latest_lsn: "0/0"]
[type: :insert, payload: %{
data: [
%{type: :text, value: "55", length: 2},
%{type: :text, value: "Test Meeting", length: 12},
%{type: :text, value: "2025-01-01 10:00:00", length: 19},
%{type: :text, value: "2025-01-01 11:00:00", length: 19},
%{type: :text, value: "2025-11-03 14:18:03", length: 19},
%{type: :text, value: "2025-11-03 14:18:03", length: 19}
],
type: :insert, relation_id: 26378
}, lsn: "4/5A7D6730", latest_lsn: "4/5A7D6730"]
[type: :commit, payload: %{
type: :commit,
commit_lsn: "4/5A7D7E10",
tx_end_lsn: "4/5A7D7E40"
}, lsn: "4/5A7D7E40", latest_lsn: "4/5A7D7E40"]

Let's discuss them in detail, starting at the top.

The BEGIN

[type: :begin, payload: %{
type: :begin,
lsn_end: "4/5A7D7E10",
tx_id: 385169
}, lsn: "4/5A7D6730", latest_lsn: "4/5A7D6730"]

The BEGIN message starts the transaction, but if you compare the lsn_end and commit_lsn fields in the BEGIN and COMMIT message, you'll see that it already contains the LSN of the COMMIT message!

So, how does the BEGIN message know when the transaction will commit?

The reason for this is Postgres' ReorderBuffer which buffers WAL records before forwarding them to pgoutput.

The ReorderBuffer doesn't send the WAL records as they come in, but it first buffers and reorders them - hence the name - and only sends them to pgoutput when the transaction commits. If a transaction is rolled back, it won't forward the messages (see 49.6.4) and we'll never receive them either. The ReorderBuffer forwards messages in sequence one transaction at a time, not in parallel. When pgoutput receives these messages, it prepends a BEGIN message to indicate the beginning of a transaction, and sends them our way.

All of this means you don't have to worry about receiving messages out-of-order or interleaved with messages from other transactions (at least not if you use proto_version '1'), or handling rolled back vs committed transactions. Our listener will always receive ordered messages for one committed transaction at a time. Thank you Postgres!

The RELATION

[type: :relation, payload: %{
type: :relation, columns: [
%{name: "id", flag: 1, data_type_oid: 20, type_modifier: 4294967295},
%{name: "title", flag: 0, data_type_oid: 1043, type_modifier: 259},
%{name: "from", flag: 0, data_type_oid: 1114, type_modifier: 0},
%{name: "until", flag: 0, data_type_oid: 1114, type_modifier: 0},
%{name: "inserted_at", flag: 0, data_type_oid: 1114, type_modifier: 0},
%{name: "updated_at", flag: 0, data_type_oid: 1114, type_modifier: 0}
],
relation_id: 26378,
namespace: "public",
relation_name: "meetings",
replica_identity_setting: :default
}, lsn: "0/0", latest_lsn: "0/0"]

pgoutput is so kind to send us the schema of the table before sending us the changes that were applied to it. It only sends it once per session but whenever we restart our connection, it will send it again.

The Relation message contains basic information of our table columns like their name, whether they are part of the primary key (flag: 1), which data type they use (data_type_oid), and whether their data type is modified (type_modifier which is equal to atttypmod), for example if a varchar column has a maximum length modifier.

It also contains the namespace and table name which is handy to connect the relation to an Ecto Schema. It also contains the relation_id, which is the table Object Identifier (OID). This OID is useful to connect a table with the INSERT message we will look at next.

The INSERT

[type: :insert, payload: %{
data: [
%{type: :text, value: "55", length: 2},
%{type: :text, value: "Test Meeting", length: 12},
%{type: :text, value: "2025-01-01 10:00:00", length: 19},
%{type: :text, value: "2025-01-01 11:00:00", length: 19},
%{type: :text, value: "2025-11-03 14:18:03", length: 19},
%{type: :text, value: "2025-11-03 14:18:03", length: 19}
],
type: :insert, relation_id: 26378
}, lsn: "4/5A7D6730", latest_lsn: "4/5A7D6730"]

The Insert message is pretty straightforward. It contains the inserted data per column. It points to the table into which the data was inserted through the relation_id field, which is the same OID of the relation that we received before.

If you match the data rows with the relation column rows, you'll see that the first value was inserted into the ID field, the second value into the title field, and so on. Postgres splits the relation data from the change data to minimize bandwidth. We really don't need to receive the same table specs with every single event.

The COMMIT

[type: :commit, payload: %{
type: :commit,
commit_lsn: "4/5A7D7E10",
tx_end_lsn: "4/5A7D7E40"
}, lsn: "4/5A7D7E40", latest_lsn: "4/5A7D7E40"]

Lastly, the COMMIT indicates the end of a transaction. Its commit_lsn points to the BEGIN message that started the transaction and its tx_end_lsn points to itself. At this point, we know the transaction is complete and can process the messages we've received if we haven't done so already.

What to do with the data tho?

It's up to you! You can buffer the messages until you receive the final COMMIT, zip the data columns with the relation column definitions, and ... print them out? I guess? At least that's what I've done here. What you do with the data depends on your use-case. Be creative!

Crash Recovery

We're almost at the end of this post, but one topic is important enough to delay that sweet sweet relief of closing the tab, feeling like you've learned something, and then to forget everything you've just read. Just, bear with me.

One major consideration when tracking changes through the WAL is how to never miss a message, even if your listener crashes. Luckily, Postgres has got our back! As mentioned in the keep-alive message handler, Postgres will keep the WAL records until we acknowledge them either as response to the keep-alive message, after we receive each message, or once we've received the final COMMIT message. Postgres calls these acknowledgments "checkpoints" by the way.

In any case, if your listener crashes before it can acknowledge the messages, Postgres will just send them again the next time our listener connects! At least if you use a persistent replication slot. You can test this by starting your application, creating a record, waiting for the WAL messages to come in, and then quickly stopping your application before the next keep-alive message comes in. If you now start your application again, your listener will receive the same messages once more!

This message recovery is great, but it also means that you must handle duplicate messages gracefully. So, however you process the messages, it must be idempotent.

And that's it! Enough about the WAL!

Further Considerations

Ha! I tricked you! Kind of.

Although this blog post was incredibly long, there's always more to learn about Postgres. Here are some pointers:

  1. Try out Walex or Cainophile which use the WAL for Change Data Capture (CDC) in Elixir.
  2. Peruse the Supabase Realtime for inspiration.
  3. Look into creating a Snapshot when creating the Replication Slot which saves the current state of the database when you create your replication slot.
  4. Call me a liar or how much this post inspired you over on Bluesky. Your choice.

Conclusion

And that's it! I hope you enjoyed this article! If you want to support me, you can buy my book or video courses (one and two). Follow me on Bluesky or subscribe to my newsletter below if you want to get notified when I publish the next blog post. Until next time! Cheerio 👋

Liked this article? Sign up for more