Introduction
Change Data Capture (CDC) enables real-time tracking of database changes, critical for event-driven systems, analytics pipelines, and synchronizing microservices. This guide walks through implementing PostgreSQL CDC in Go using native logical replication and the pgx
driver.
Sponsor
Duplicating microservice environments for testing creates unsustainable costs and operational complexity. Instead, modern engineering teams are adopting application-layer isolation with Signadot’s "sandboxes" - sharing underlying infrastructure while maintaining isolation through smart request routing. This approach cuts infrastructure costs by over 90% while enabling 10x faster testing cycles and boosting developer productivity.
1. Understanding PostgreSQL CDC
PostgreSQL provides CDC via logical replication, which decodes changes from the write-ahead log (WAL) into consumable events (inserts/updates/deletes). Key concepts:
Replication Slots: Persistent channels for streaming changes.
Publications: Define which tables to monitor.
Logical Decoding Plugins: Convert WAL entries to readable formats (e.g.,
pgoutput
,wal2json
).
2. Prerequisites
a) PostgreSQL configured for replication
# postgresql.conf
wal_level = logical
max_replication_slots = 5
In GCP with cloudSQL, you have to enable some flags:
cloudsql.logical_decoding = on
b) Replication user
CREATE ROLE repl_user WITH LOGIN REPLICATION PASSWORD 'password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO repl_user;
3. Implementing CDC in Go
Preambule:
pglogrepl is a Go library designed for interacting with PostgreSQL’s logical replication protocol. Built atop the github.com/jackc/pgx/v5/pgconn package, it enables Go applications to connect to PostgreSQL databases and process logical replication messages, making it well-suited for building Change Data Capture (CDC) solutions, streaming data pipelines or custom replication clients..
Note: replication=database in the DSN is actually required
import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)
type event struct {
Relation string
Columns map[string]any
Operation string
}
func main() {
conn, _ := pgx.Connect(context.Background(), "postgres://repl_user:password@localhost:5432/dbname?replication=database")
c := make(chan event)
ReadAndDefer(context.Background(), conn.PgConn(), c)
}
Step 1: Start replication
func startReplication(ctx context.Context, conn *pgconn.PgConn) error {
var err error
pubName := os.Getenv("PUBLICATION_NAME")
// 1. Drop publication if it exists
if _, err := conn.Exec(ctx, fmt.Sprintf("DROP PUBLICATION IF EXISTS %s;", pubName)).ReadAll(); err != nil {
return err
}
// create the publication for all tables
if _, err := conn.Exec(ctx, fmt.Sprintf("CREATE PUBLICATION %s FOR ALL TABLES;", pubName)).ReadAll(); err != nil {
return err
}
// 3. create temporary replication slot server
if _, err = pglogrepl.CreateReplicationSlot(ctx, conn, os.Getenv("SLOT_NAME"), os.Getenv("OUTPUT_PLUGIN"), pglogrepl.CreateReplicationSlotOptions{Temporary: true}); err != nil {
return err
}
pluginArguments := []string{
"proto_version '1'",
fmt.Sprintf("publication_names '%s'", pubName),
"messages 'true'",
}
// 4. establish connection
return pglogrepl.StartReplication(ctx, conn, os.Getenv("SLOT_NAME"), pglogrepl.ParseLSN("0/0"), pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
}
Step 2: receive message
func ReadAndDefer(ctx context.Context, conn *pgconn.PgConn, c chan event) error {
err := startReplication(ctx, conn)
...
relations := map[uint32]*pglogrepl.RelationMessage{}
typeMap := pgtype.NewMap()
for {
msg, err := conn.ReceiveMessage(ctx)
switch msg := msg.(type) {
case *pgproto3.CopyData:
switch msg.Data[0] {
case pglogrepl.PrimaryKeepaliveMessageByteID:
// Parse primary keep alive message
// update client XLogPos if necessary
case pglogrepl.XLogDataByteID:
walLog, err := pglogrepl.ParseXLogData(msg.Data[1:])
if err != nil {
log.Printf("failed to parse logical WAL log: %v", err)
}
if err := parseAndDispatchMessage(&walLog, relations, typeMap, c); err != nil {
log.Printf("failed to parse and dispatch message: %v", err)
}
}
default:
log.Printf("received unexpected message: %T\n", msg)
}
}
}
Step 3: Parse messages
func decodeTextColumnData(mi *pgtype.Map, data []byte, dataType uint32) (any, error) {
if dt, ok := mi.TypeForOID(dataType); ok {
return dt.Codec.DecodeValue(mi, dataType, pgtype.TextFormatCode, data)
}
return string(data), nil
}
func parseAndDispatchMessage(walLog *pglogrepl.XLogData, relations map[uint32]*pglogrepl.RelationMessage, typeMap *pgtype.Map, c chan event) error {
msg, err := pglogrepl.Parse(walLog.WALData)
if err != nil {
return err
}
switch m := msg.(type) {
case *pglogrepl.RelationMessage:
relations[m.RelationID] = m
case *pglogrepl.InsertMessage:
rel, ok := relations[m.RelationID]
if !ok {
log.Printf("relation %d not found", m.RelationID)
break
}
values := map[string]any{}
for idx, col := range m.Tuple.Columns {
colName := rel.Columns[idx].Name
switch col.DataType {
case 'n': // null
values[colName] = nil
case 't': // text
val, _ := decodeTextColumnData(typeMap, col.Data, rel.Columns[idx].DataType)
values[colName] = val
}
event := event{
Relation: rel.RelationName,
Columns: values,
Operation: "insert",
}
c <- event
case *pglogrepl.UpdateMessage:
...
case *pglogrepl.DeleteMessage:
...
}
return nil
}
5. Handling Decoded Changes
The pglogrepl
library parses raw WAL data into structured messages. For example, an InsertMessage
contains:
RelationID: Table identifier
Tuple: New row values
Columns: Metadata about columns
Example Output:
INSERT: {RelationID: 16393, Tuple: [{Column: user_id, Value: 123}, ...]}
Drawbacks
No automatic schema replication: (ALTER TABLE ..) must be manually applied to subscribers.
Primary Key requirement: tables must have a primary key or unique index for replication.
Large objects: BLOB/CLOB are not replicated
Non-table objects: Views, materialized views and foreign tables are excluded.
The complexity around handling replication versus the simplicity of other solutions. (tradeoff performance vs maintenance)
Conclusion
PostgreSQL's logical replication paired with Go's pgx
provides a robust foundation for building CDC pipelines. By streaming changes directly from the WAL, you achieve low-latency access to database events while minimizing performance overhead.
Resources:
This implementation gives you full control over change processing, making it ideal for building reactive systems that respond instantly to database changes.
Sponsor
Duplicating microservice environments for testing creates unsustainable costs and operational complexity. Instead, modern engineering teams are adopting application-layer isolation with Signadot’s "sandboxes" - sharing underlying infrastructure while maintaining isolation through smart request routing. This approach cuts infrastructure costs by over 90% while enabling 10x faster testing cycles and boosting developer productivity.
Combining PostgreSQL's logical replication with Go's pgx is a game-changer for building efficient CDC pipelines—low latency, minimal overhead, and direct WAL access make it incredibly powerful for real-time data processing.