Modern applications generate a lot of data mostly stored in ACID compliant OLTP storage systems like Postgres, Mysql, SQLite etc. Once collected, this data needs to be queried and processed for analytics or any other purposes to get meaningful insights for business decision-making. The data to be queried could be stored in different databases and in order to access all the data you would need to run queries on multiple different databases. This is less efficient and adds extra complexity to the system.
One simple solution would be to use a central OLTP database, for collating all the data and handling all your analytical queries from it. It can be another Postgres instance acting as a data warehouse. This solution works but has a few issues such as hard requirements on the shape the…
Modern applications generate a lot of data mostly stored in ACID compliant OLTP storage systems like Postgres, Mysql, SQLite etc. Once collected, this data needs to be queried and processed for analytics or any other purposes to get meaningful insights for business decision-making. The data to be queried could be stored in different databases and in order to access all the data you would need to run queries on multiple different databases. This is less efficient and adds extra complexity to the system.
One simple solution would be to use a central OLTP database, for collating all the data and handling all your analytical queries from it. It can be another Postgres instance acting as a data warehouse. This solution works but has a few issues such as hard requirements on the shape the data(structured), scaling limits etc.
Data lakes offer a different solution. It allows provides storage for all types of images, files(in different formats e.g CSV, Json, Apache Parquet, Avro, pdfs etc), videos etc. The data can be stored in its raw form and can be accessed later to be processed further(as in Extract Load Transform) for your analytics or machine learning purposes. Data lakes are usually built on object storages like S3, Google Cloud Storage, Azure Blob Storage, Minio etc. The standout advantage of a data lake is the promise of limitless storage and ability to store virtually anything. Once data is in the data lake and the necessary transformations have been applied to it, it needs to be exposed to various stakeholders(machine learning engineers, analysts, “AI people”, execs etc) to derive business value from it.
Data warehouses built on “traditional” OLTP databases, like Postgres, that already organize data in tables and come with in-built SQL query processing engines make this part easier. For a data lake where the data to be queried is scattered across multiple files, querying the data gets complicated. The data lake for the most part only gets you storage. You have to bring your own compute layer where you query and process the data. Luckily, there already exists a number of solutions.
A common solution is using open table formats like Apache Iceberg(others are Delta lake and Apache Hudi). With these tools you get the benefits of traditional database functionality on your data lake i.e ACID guarantees, transactions. The Iceberg specification defines an open table format that enables accessing related data stored in separate files in a distributed storage system, as one table.
Iceberg Design
The design for Iceberg is as shown below:
The design is pretty simple. The catalog holds metadata about the tables being managed by Iceberg. A snapshot holds the state of the table at any point in time. Any change to the table results in a new snapshot being generated. The snapshot stores the manifest list, which is a list of manifest files. Each data file is tracked by a manifest file.
Iceberg is able to efficiently manage large amounts of data stored in the data lake. The data layer supports storing data in open formats like Apache parquet or Avro. Apache Parquet is an open columnar data format for efficient data storage and retrieval. With this, you automatically get the benefits of column storage for your analytical workloads. Engines like Apache Spark, Apache Flink, Presto, Trino etc can be used for the compute layer for data querying and processing.
The are two methods for getting data into the data lake: batching data or streaming data directly from the source. Batching is less complicated and can be scheduled nightly. Streaming on the other hand is more involving and requires more effort to get right. A common setup is usually Debezium for change-data-capture(CDC) with an event streaming platform like Apache kafka or Redpanda.
In the remainder of this post, we’ll go through setting up a simple pipeline to stream data from a Postgres server to an Iceberg table. The setup will use Postgres(we love Postgres here) as an SQL catalog, Minio which is an open source S3 compatible object storage, Apache parquet format for the data files and Trino as the processing framework(compute layer).
Change Data Capture Tool
We’ll also use a simple CDC tool I have been working on. It implements the Postgres wire protocol and utilizes logical replication to capture DML changes and stream it to the data lake. It is written in mostly Rust but the part for uploading data to the data lake is done in Python. It makes it easier to do any additional processing of the data in Python before uploading to the data lake(ETL) or as in ELT, upload the data as is to the data lake then perform the transformation part later. The Python data ecosystem is rich and provides a lot of excellent tools to work with data.
Streaming data can result in an explosion of very small files being created in the data lake. This can cause a lot of network chatter in transferring every record being captured by the CDC tool and slow down data processing. It helps to buffer the data locally and only send it over the network in larger batches. This can potentially speed up the pipeline( as data is first saved on local disk) and also help reduce on network costs. pg_rusted_wire uses segments, which are fixed-sized file chunks to accumulate data received from Postgres. Data in the active segment is later uploaded to the data lake once the segment reaches the configured size or when the configured upload time interval is reached. Bothe the interval and segment size can be adjusted with the constants MAX_WRITE_INTERVAL and MAX_SEGMENT_SIZE respectively, in wire.rs.
Setting up the Data Lake
To begin, clone the repository here and execute the run.sh bash script. Once the script has completed you should be able to see three services running with a healthy status – Trino takes a few seconds(~20s) to startup and transition to a healthy state. Navigate to http://localhost:9001 on your browser to browse the Minio object storage UI. Use the username minio_user and password pass1234.
The default bucket warehouse should be empty. A new catalog called example_cat has also been created. The catalog should be empty since we have not added any tables.
$ docker exec -it iceberg-trino-example-metastore-1 psql -U repl_user -d iceberg
psql (18.0)
Type "help" for help.
iceberg=# table iceberg_tables; -- Empty
catalog_name | table_namespace | table_name | metadata_location | previous_metadata_location
--------------+-----------------+------------+-------------------+----------------------------
(0 rows)
iceberg=# table iceberg_namespace_properties; -- Empty
catalog_name | namespace | property_key | property_value
--------------+-----------+--------------+----------------
(0 rows)
iceberg=#
We can now create a schema on Iceberg to hold our table. We also create a new table called employee to store employee details. The Trino processing engine will be used to perform these actions.
$ docker exec -it iceberg-trino-example-trino_connector-1 trino
trino> CREATE SCHEMA example_cat.example_schema WITH (location = 's3://warehouse/test');
CREATE SCHEMA
trino> CREATE TABLE example_cat.example_schema.employee
(
id INTEGER,
name VARCHAR,
salary DECIMAL(10,2)
)
WITH (
format = 'PARQUET'
);
CREATE TABLE
trino>
Now we have a schema example_schema and table employee and a catalog named example_cat. Accessing our table requires the full path in the <catalog_name>.<schame_name>.<table_name> format. In out case, the table name is example_cat.example_schema.employee.
The bucket created earlier should now have some data in it. These are metadata describing the state of the table that has been created. One is a metadata file and the other is a snapshot file(prefixed with “snap”). We don’t have a manifest file yet since we don’t have any data yet.
Let’s add some data to the table
$ docker exec -it iceberg-trino-example-trino_connector-1 trino
trino> INSERT INTO example_cat.example_schema.employee (id, name, salary) VALUES (1, 'Sam Evans', 55000);
INSERT INTO example_cat.example_schema.employee (id, name, salary) VALUES (2, 'James Bond', 12000);
INSERT: 1 row
Query 20251031_121000_00003_dw2kt, FINISHED, 1 node
Splits: 23 total, 23 done (100.00%)
2.66 [0 rows, 0B] [0 rows/s, 0B/s]
INSERT: 1 row
Query 20251031_121003_00004_dw2kt, FINISHED, 1 node
Splits: 23 total, 23 done (100.00%)
0.78 [0 rows, 0B] [0 rows/s, 0B/s]
trino> SELECT * FROM example_cat.example_schema.employee; -- Query the data
id | name | salary
----+------------+----------
1 | Sam Evans | 55000.00
2 | James Bond | 12000.00
(2 rows)
Query 20251031_121103_00005_dw2kt, FINISHED, 1 node
Splits: 2 total, 2 done (100.00%)
0.57 [2 rows, 1KB] [3 rows/s, 1.77KB/s]
trino> exit
More metadata files are generated. We can browse it in the object store UI
A new directory named data has also been created that stores the data files
There are two parquet data files. We inserted two records in separate transactions. Each transaction created it’s own data file and metadata files: When the state of a table changes(delete, insert, update) a new snapshot is created to track the new table state. New metadata file(stores location of the snapshot) and manifest file(to track the new data file) are also created.
We can drop the table to clear all the files associated with the table including the metadata files and data files
$ docker exec -it iceberg-trino-example-trino_connector-1 trino
trino> DROP TABLE example_cat.example_schema.employee;
DROP TABLE
trino> exit
Streaming Data to Iceberg
We can now setup a simple pipeline with pg_rusted_wire to stream data from Postgres to Iceberg. In this example, we will use the same Postgres instance both as the catalog store and as the data source. It will use logical streaming replication to capture table changes from Postgres.
First, prepare the data source for logical streaming.
$ docker exec -it iceberg-trino-example-metastore-1 psql -U repl_user -d iceberg -c "ALTER SYSTEM SET wal_level = 'logical'"
ALTER SYSTEM
$ docker restart iceberg-trino-example-metastore-1
iceberg-trino-example-metastore-1
Then create the employee table from which we will get data. We also create a publication on the table which the CDC tool will subscribe, to capture changes made to the table.
$ docker exec -it iceberg-trino-example-metastore-1 psql -U repl_user -d postgres
postgres=# CREATE TABLE employee
( id serial primary key,
name varchar,
salary decimal(10,2)
);
CREATE TABLE
postgres=# INSERT INTO employee (name, salary) select 'Mkamze Mwatela' || i, i*200 from generate_series(1, 100000) i;
INSERT 0 100000
postgres=# CREATE PUBLICATION pub1 FOR TABLE employee;
CREATE PUBLICATION
postgres=# exit
The employee table has been created and has 100k records.
Let’s also recreate the employee table on Iceberg. The data streamed from Postgres will be stored in this table.
$ docker exec -it iceberg-trino-example-trino_connector-1 trino
trino> CREATE TABLE example_cat.example_schema.employee
(
id INTEGER,
name VARCHAR,
salary DECIMAL(10,2)
)
WITH (
format = 'PARQUET'
);
CREATE TABLE
trino> exit
Next create a file called config in a directory of your choosing e.g /home/your_user/.tmp/config. The file contains credentials for connecting to the object store and the catalog store. Copy the details below into the file.
Use the command here
ifconfig docker0 | grep -w "inet" | awk '{print $2}'to get the host docker ip address. Replace thehost_ipbelow with the output from the command.
S3_SECRET_KEY='pass1234'
S3_ENDPOINT='http://host_ip:9000'
S3_ACCESS_KEY='minio_user'
CATALOG_URI='postgresql+psycopg2://repl_user:pass1234@host_ip:5432/iceberg'
TABLE_NAME='example_cat.example_schema.employee'
Open a new terminal window and clone the repo at https://github.com/misachi/pg_rusted_wire. Run the example CDC tool as shown below
The
/home/your_user/.tmp/should be the directory that has theconfigfile described above.
$ git clone https://github.com/misachi/pg_rusted_wire.git && cd pg_rusted_wire
$ DOCKER_IP=`ifconfig docker0 | grep -w "inet" | awk '{print $2}'` && cargo run --example lrepl -- -u repl_user -P pass1234 -H $DOCKER_IP -d postgres -p 5432 --table employee --publication pub1 --config-dir /home/your_user/.tmp/
If there are no errors, then after a few seconds(depending of configured MAX_WRITE_INTERVAL setting) the initial data in the employee table should have been copied to the employee Iceberg table. We can check using Trino with the SQL command below
$ docker exec -it iceberg-trino-example-trino_connector-1 trino
trino> SELECT COUNT(*) FROM example_cat.example_schema.employee;
_col0
--------
100000
(1 row)
Query 20251102_201645_00003_ack9h, FINISHED, 1 node
Splits: 10 total, 10 done (100.00%)
0.41 [100K rows, 0B] [242K rows/s, 0B/s]
All the 100k records in the Postgres table have been copied to Iceberg.
Inserts
We can then insert additional data to our Postgres employee table and it will be streamed to Iceberg
$ docker exec -it iceberg-trino-example-metastore-1 psql -U repl_user -d postgres -c "INSERT INTO employee (name, salary) select 'Manjaz' || i, i*200 from generate_series(1, 5) i"
INSERT 0 5
Five more records have been inserted in the table. The records count on Iceberg table should now be 100,005.
$ docker exec -it iceberg-trino-example-trino_connector-1 trino
trino> SELECT COUNT(*) FROM example_cat.example_schema.employee;
_col0
--------
100005
(1 row)
Query 20251102_202608_00004_ack9h, FINISHED, 1 node
Splits: 11 total, 11 done (100.00%)
0.40 [100K rows, 0B] [253K rows/s, 0B/s]
trino> exit
The object storage web UI shows something similar
There are 2 data files generated. One is from the initial copy of 100k records with size 1.2MBs. The second data file is for the 5 records inserted later with size 1.3KB.
Updates
When a record is updated in the Postgres table, the change is captured and streamed to Iceberg.
$ docker exec -it iceberg-trino-example-metastore-1 psql -U repl_user -d postgres
psql (18.0)
Type "help" for help.
postgres=# SELECT * FROM employee WHERE name like '%Manjaz%'; -- Initial salary records for the 5 employees
id | name | salary
--------+---------+---------
100001 | Manjaz1 | 200.00
100002 | Manjaz2 | 400.00
100003 | Manjaz3 | 600.00
100004 | Manjaz4 | 800.00
100005 | Manjaz5 | 1000.00
(5 rows)
postgres=# UPDATE employee SET salary = 4000 WHERE name like '%Manjaz%';
UPDATE 5
postgres=# SELECT * FROM employee WHERE name like '%Manjaz%';
id | name | salary
--------+---------+---------
100001 | Manjaz1 | 4000.00
100002 | Manjaz2 | 4000.00
100003 | Manjaz3 | 4000.00
100004 | Manjaz4 | 4000.00
100005 | Manjaz5 | 4000.00
(5 rows)
postgres=# exit
The employees with names that start with Manjaz have all been updated.
$ docker exec -it iceberg-trino-example-trino_connector-1 trino
trino> SELECT COUNT(*) FROM example_cat.example_schema.employee;
_col0
--------
100005
(1 row)
Query 20251102_204321_00012_ack9h, FINISHED, 1 node
Splits: 11 total, 11 done (100.00%)
0.51 [100K rows, 0B] [195K rows/s, 0B/s]
trino> SELECT * FROM example_cat.example_schema.employee WHERE name like '%Manjaz%';
id | name | salary
--------+---------+---------
100001 | Manjaz1 | 4000.00
100002 | Manjaz2 | 4000.00
100003 | Manjaz3 | 4000.00
100004 | Manjaz4 | 4000.00
100005 | Manjaz5 | 4000.00
(5 rows)
Query 20251102_204403_00013_ack9h, FINISHED, 1 node
Splits: 2 total, 2 done (100.00%)
1.24 [100K rows, 1.21MB] [80.4K rows/s, 996KB/s]
trino> exit
The updates to the employee table in Postgres have been successfully streamed to Iceberg table.
The Minio object storage browser now shows 3 data files. All the updated records were in the data file named 00000-0-14307206-2120-4897-a3ef-fddd94900de2.parquet since they were created in a single transaction. A new data file named 00000-0-5af272e2-4e18-4828-a66f-b6667e982e16.parquet has been created as a result of the update. UPDATEs are done in copy-on-write mode by default: changes to the data file are not done in place but the data is first copied in memory then updated and finally written to a new file.
Deletes
Deleting records works similar to Updates and results in records being removed from the Iceberg table. Since deletes change the state of the table, new snapshot and hence new metadata file will be generated. A new data file is is also created with a new manifest file pointing to it.
Postgres is purpose built to handle OLTP workloads and it does that really well. Its row-based data storage design might not work well for all analytical workloads - depending on specific needs and data size. OLAP data stores designed specifically for analytics come in handy when Postgres no longer meets your needs. We have shown one such solution where you have a data lake for storing your raw data in different files and treating related files as a single table to run your queries on with Apache Iceberg. Data lake’s ability to handle large, diverse datasets makes it an essential foundation for modern data-driven decision-making. When combined with open table formats like Apache Iceberg, data lakes become even more powerful offering reliable data management, faster queries, and a scalable path toward advanced analytics.