2025-12-18
8 min read

When you’re dealing with large amounts of data, it’s helpful to get a quick overview — which is exactly what aggregations provide in SQL. Aggregations, known as “GROUP BY queries”, provide a bird’s eye view, so you can quickly gain insights from vast volumes of data.
That’s why we are excited to announce support for aggregations in R2 SQL, Cloudflare’s serverless, distributed, analytics query engine, which is capable of running SQL queries over data stored in R2 Data Catalog. Aggregations will allow users of [R2 SQL](https://developers.clou…
2025-12-18
8 min read

When you’re dealing with large amounts of data, it’s helpful to get a quick overview — which is exactly what aggregations provide in SQL. Aggregations, known as “GROUP BY queries”, provide a bird’s eye view, so you can quickly gain insights from vast volumes of data.
That’s why we are excited to announce support for aggregations in R2 SQL, Cloudflare’s serverless, distributed, analytics query engine, which is capable of running SQL queries over data stored in R2 Data Catalog. Aggregations will allow users of R2 SQL to spot important trends and changes in the data, generate reports and find anomalies in logs.
This release builds on the already supported filter queries, which are foundational for analytical workloads, and allow users to find needles in haystacks of Apache Parquet files.
In this post, we’ll unpack the utility and quirks of aggregations, and then dive into how we extended R2 SQL to support running such queries over vast amounts of data stored in R2 Data Catalog.
The importance of aggregations in analytics
Aggregations, or “GROUP BY queries”, generate a short summary of the underlying data.
A common use case for aggregations is generating reports. Consider a table called “sales”, which contains historical data of all sales across various countries and departments of some organisation. One could easily generate a report on the volume of sales by department using this aggregation query:
SELECT department, sum(value)
FROM sales
GROUP BY department
The “GROUP BY” statement allows us to split table rows into buckets. Each bucket has a label corresponding to a particular department. Once the buckets are full, we can then calculate “sum(value)” for all rows in each bucket, giving us the total volume of sales performed by the corresponding department.
For some reports, we might only be interested in departments that had the largest volume. That’s where an “ORDER BY” statement comes in handy:
SELECT department, sum(value)
FROM sales
GROUP BY department
ORDER BY sum(value) DESC
LIMIT 10
Here we instruct the query engine to sort all department buckets by their total sales volume in the descending order and only return the top 10 largest.
Finally, we might be interested in filtering out anomalies. For example, we might want to only include departments that had more than five sales total in our report. We can easily do that with a “HAVING” statement:
SELECT department, sum(value), count(*)
FROM sales
GROUP BY department
HAVING count(*) > 5
ORDER BY sum(value) DESC
LIMIT 10
Here we added a new aggregate function to our query — “count(*)” — which calculates how many rows ended up in each bucket. This directly corresponds to the number of sales in each department, so we have also added a predicate in the “HAVING” statement to make sure that we only leave buckets with more than five rows in them.
Two approaches to aggregation: compute sooner or later
Aggregation queries have a curious property: they can reference columns that are not stored anywhere. Consider “sum(value)”: this column is computed by the query engine on the fly, unlike the “department” column, which is fetched from Parquet files stored on R2. This subtle difference means that any query that references aggregates like “sum”, “count” and others needs to be split into two phases.
The first phase is computing new columns. If we are to sort the data by “count(*)” column using “ORDER BY” statement or filter rows based on it using “HAVING” statement, we need to know the values of this column. Once the values of columns like “count(*)” are known, we can proceed with the rest of the query execution.
Note that if the query does not reference aggregate functions in “HAVING” or “ORDER BY”, but still uses them in “SELECT”, we can make use of a trick. Since we do not need the values of aggregate functions until the very end, we can compute them partially and merge results just before we are about to return them to the user.
The key difference between the two approaches is when we compute aggregate functions: in advance, to perform some additional computations on them later; or on the fly, to iteratively build results the user needs.
First, we will dive into building results on the fly — a technique we call “scatter-gather aggregations.” We will then build on top of that to introduce “shuffling aggregations” capable of running extra computations like “HAVING” and “ORDER BY” on top of aggregate functions.
Scatter-gather aggregations
Aggregate queries without “HAVING” and “ORDER BY” can be executed in a fashion similar to filter queries. For filter queries, R2 SQL picks one node to be the coordinator in query execution. This node analyzes the query and consults R2 Data Catalog to figure out which Parquet row groups may contain data relevant to the query. Each Parquet row group represents a relatively small piece of work that a single compute node can handle. Coordinator node distributes the work across many worker nodes and collects results to return them to the user.
In order to execute aggregate queries, we follow all the same steps and distribute small pieces of work between worker nodes. However, this time instead of just filtering rows based on the predicate in the “WHERE” statement, worker nodes also compute pre-aggregates.
Pre-aggregates represent an intermediary state of an aggregation. This is an incomplete piece of data representing a partially computed aggregate function on a subset of data. Multiple pre-aggregates can be merged together to compute the final value of an aggregate function. Splitting aggregate functions into pre-aggregates allows us to horizontally scale computation of aggregation, making use of vast compute resources available in Cloudflare’s network.

For example, pre-aggregate for “count(*)” is simply a number representing the count of rows in a subset of data. Computing the final “count(*)” is as easy as adding these numbers together. Pre-aggregate for “avg(value)” consists of two numbers: “sum(value)” and “count(*)”. The value of “avg(value)” can then be computed by adding together all “sum(value)” values, adding together all “count(*)” values and finally dividing one number by the other.
Once worker nodes have finished computing the pre-aggregates, they stream results to the coordinator node. The coordinator node collects all results, computes final values of aggregate functions from pre-aggregates, and returns the result to the user.
Shuffling, beyond the limits of scatter-gather
Scatter-gather is highly efficient when the coordinator can compute the final result by merging small, partial states from workers. If you run a query like SELECT sum(sales) FROM orders, the coordinator receives a single number from each worker and adds them up. The memory footprint on the coordinator is negligible regardless of how much data resides in R2.
However, this approach becomes inefficient when the query requires sorting or filtering based on the result of an aggregation. Consider this query, which finds the top two departments by sales volume:
SELECT department, sum(sales)
FROM sales
GROUP BY department
ORDER BY sum(sales) DESC
LIMIT 2
Correctly determining the global Top 2 requires knowing the total sales for every department across the entire dataset. Because the data is spread effectively at random across the underlying Parquet files, sales for a specific department are likely split across many different workers. A department might have low sales on every individual worker, excluding it from any local Top 2 list, yet have the highest sales volume globally when summed together.
The diagram below illustrates how a scatter-gather approach would not work for this query. "Dept A" is the global sales leader, but because its sales are evenly spread across workers, it doesn’t make to some local Top 2 lists, and ends up being discarded by the coordinator.

Consequently, when the query orders results by their global aggregation, the coordinator cannot rely on pre-filtered results from workers. It must request the total count for every department from every worker to calculate the global totals before it can sort them. If you are grouping by a high-cardinality column like IP addresses or User IDs, this forces the coordinator to ingest and merge millions of rows, creating a resource bottleneck on a single node.
To solve this, we need shuffling, a way to colocate data for specific groups before the final aggregation occurs.
Shuffling of aggregation data
To address the challenges of random data distribution, we introduce a shuffling stage. Instead of sending results to the coordinator, workers exchange data directly with each other to colocate rows based on their grouping key.
This routing relies on deterministic hash partitioning. When a worker processes a row, it hashes the GROUP BY column to identify the destination worker. Because this hash is deterministic, every worker in the cluster independently agrees on where to send specific data. If "Engineering" hashes to Worker 5, every worker knows to route "Engineering" rows to Worker 5. No central registry is required.
The diagram below illustrates this flow. Notice how "Dept A" starts on Workers 1, 2 and 3. Because the hash function maps "Dept A" to Worker 1, all workers route those rows to that same destination.

Shuffling aggregates produces the correct results. However, this all-to-all exchange creates a timing dependency. If Worker 1 begins calculating the final total for "Dept A" before Worker 3 has finished sending its share of the data, the result will be incomplete.
To address this, we enforce a strict synchronization barrier. The coordinator tracks the progress of the entire cluster while workers buffer their outgoing data and flush it via gRPC streams to their peers. Only when every worker confirms that it has finished processing its input files and flushing its shuffle buffers does the coordinator issue the command to proceed. This barrier guarantees that when the next stage begins, the dataset on each worker is complete and accurate.
Local finalization
Once the synchronization barrier is lifted, every worker holds the complete dataset for its assigned groups. Worker 1 now has 100% of the sales records for "Dept A" and can calculate the final total with certainty.
This allows us to push computational logic like filtering and sorting down to the worker rather than burdening the coordinator. For example, if the query includes HAVING count(*) > 5, the worker can filter out groups that do not meet this criteria immediately after aggregation.
At the end of this stage, each worker produces a sorted, finalized stream of results for the groups it owns.
The streaming merge
The final piece of the puzzle is the coordinator. In the scatter-gather model, the coordinator was responsible for the expensive task of aggregating and sorting the entire dataset. In the shuffling model, its role changes.
Because the workers have already computed the final aggregates and sorted them locally, the coordinator only needs to perform a k-way merge. It opens a stream to every worker and reads the results row by row. It compares the current row from each worker, picks the "winner" based on the sort order, and adds it to the query results that will be sent to the user.
This approach is particularly powerful for LIMIT queries. If a user asks for the top 10 departments, the coordinator merges the streams until it has found the top 10 items and then immediately stops processing. It does not need to load or merge the millions of remaining rows, allowing for greater scale of operation without over-consumption of compute resources.
A powerful engine for processing massive datasets
With the addition of aggregations, R2 SQL transforms from a tool great for filtering data into a powerful engine capable of data processing on massive datasets. This is made possible by implementing distributed execution strategies like scatter-gather and shuffling, where we are able to push the compute to where the data lives, using the scale of Cloudflare’s global compute and network.
Whether you are generating reports, monitoring high-volume logs for anomalies, or simply trying to spot trends in your data, you can now easily do it all within Cloudflare’s Developer Platform without the overhead of managing complex OLAP infrastructure or moving data out of R2.
Try it now
Support for aggregations in R2 SQL is available today. We are excited to see how you use these new functions with data in R2 Data Catalog.
Get Started: Check out our documentation for examples and syntax guides on running aggregation queries.
Join the Conversation: If you have questions, feedback, or want to share what you’re building, join us in the Cloudflare Developer Discord.
Cloudflare’s connectivity cloud protects entire corporate networks, helps customers build Internet-scale applications efficiently, accelerates any website or Internet application, wards off DDoS attacks, keeps hackers at bay, and can help you on your journey to Zero Trust.
Visit 1.1.1.1 from any device to get started with our free app that makes your Internet faster and safer.
To learn more about our mission to help build a better Internet, start here. If you’re looking for a new career direction, check out our open positions.