Our integration service connects our platform to external systems. Earlier this year, we reached a scaling limit at 40 integrations and rebuilt it from the ground up.
The service handles three primary responsibilities: sending data to external systems, managing job queues, and prioritizing work based on criticality. The original implementation functioned but had architectural constraints that prevented horizontal scaling.
We use microservices because different components have conflicting requirements. The management API handles complex business logic with normalized schemas—separate tables for translations and categories. The public API optimizes for read performance under load, using denormalized data by adding translations directly into category tables and handling filtering in G…
Our integration service connects our platform to external systems. Earlier this year, we reached a scaling limit at 40 integrations and rebuilt it from the ground up.
The service handles three primary responsibilities: sending data to external systems, managing job queues, and prioritizing work based on criticality. The original implementation functioned but had architectural constraints that prevented horizontal scaling.
We use microservices because different components have conflicting requirements. The management API handles complex business logic with normalized schemas—separate tables for translations and categories. The public API optimizes for read performance under load, using denormalized data by adding translations directly into category tables and handling filtering in Go. A monolithic architecture would require compromising performance in one area to accommodate the other.
The integration service currently processes millions of events daily, with volume increasing as we onboard new customers.
This post describes our implementation of a queue system using PostgreSQL and Go, focusing on design decisions and technical trade-offs.
Limitations of v1
The first implementation used GCP Pub/Sub, a topic-to-many-subscription service where messages are replicated across multiple queues. This architecture introduced several scalability issues.
Data Dependency
The integration service maintained a database for integration configurations but lacked ownership of its operational data. This violated a distributed systems principle: services should own their data rather than depend on other services for it.
This dependency forced our management service to serialize complete payloads into the queue. Updating a single attribute on a sub-object required sending the entire parent object with all nested sub-objects, metadata, and relationships. Different external APIs have varying data requirements—some need individual sub-objects while others require complete hierarchies.
For clients with records containing 300-500 sub-objects, this resulted in significant message size inflation. GCP charges by message size rather than count, making large messages substantially more expensive than smaller ones.
Memory and Scaling Constraints
GCP’s WebSocket delivery requires clients to buffer messages internally. With 40 integrations running separate consumers with filters, traffic spikes created memory pressure:
- Mass updates generate large objects per record
- Objects are duplicated for each configured integration
- Copies buffer across 5-10 consumer instances
- Infrastructure requires 2GB RAM and 2 cores to handle spikes, despite needing only 512MB and 1 core during normal operation
This prevented horizontal scaling and limited us to vertical scaling approaches.
Rate Limiting Challenges
External APIs enforce varying rate limits. Our in-memory rate limiter tracked requests per integration but prevented horizontal scaling since state couldn’t be shared across instances without risking rate limit violations.
System Breaking Point
By early 2025, these issues had compounded: excessive message sizes increasing costs, memory bloat requiring oversized containers, vertical-only scaling, high operational expenses, rate limiting preventing horizontal scale, and lack of data independence.
The system couldn’t accommodate our growth trajectory. A complete rebuild was necessary.
Architectural Changes in v2
The v2 design addressed specific limitations:
- Horizontal scaling - Enable scaling across multiple containers
- Distributed rate limiting - Coordinate rate limits across instances
- Data ownership - Store operational data within the service
- Delta updates - Send only changed data rather than complete records
- Fair scheduling - Prevent single integrations from monopolizing resources
Additional improvements:
- Priority queuing - Process critical updates before lower-priority changes
- Self-service re-sync - Enable customers to re-sync catalogs independently
- Visibility - Provide APIs for customers to monitor sent data and queue status
Deduplication Strategy
The standard approach involves the producer computing payloads and sending them to the queue for consumer processing. We used this in v1 but rejected it for v2.
Customers frequently make multiple rapid changes to the same record—updating a title, then a price, then a description. Each change triggers an event. Instead of sending three separate updates, we consolidate changes into a single update.
We implemented a deduplication_id
in the jobs table. Multiple updates to the same record within a short time window are deduplicated into a single job, reducing load on both our system and recipient systems.
PostgreSQL as Queue Backend
We chose PostgreSQL as our queue backend for several reasons:
- Performance - PostgreSQL is fast enough for our use case. We don’t need sub-second message delivery.
- Simplicity - Using a managed PostgreSQL instance on GCP is significantly simpler than introducing new infrastructure.
- Familiarity - Most developers understand SQL, reducing onboarding time.
- Existing infrastructure - We already use PostgreSQL for our data, eliminating the need for additional systems.
Often, we think we need something bigger like Apache Kafka when a relational database like PostgreSQL is sufficient for our requirements.
The jobs table structure:
``` | |
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 | |
10 | |
11 | |
12 | |
13 | |
14 | |
15 | |
16 | |
17 | |
18 |
|
CREATE TABLE public.jobs (
id uuid DEFAULT public.uuid_generate_v7() NOT NULL,
app_id uuid NOT NULL,
integration_id uuid NOT NULL,
integration_type text NOT NULL,
correlation_id uuid NOT NULL,
job_type text NOT NULL,
payload jsonb NOT NULL,
deduplication_id text NOT NULL,
status public.job_status DEFAULT ‘pending’::public.job_status NOT NULL,
error_message text,
retry_count numeric,
started_at timestamp with time zone,
failed_at timestamp with time zone,
completed_at timestamp with time zone,
retry_at timestamp with time zone,
created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP
);
Each job tracks:
- `correlation_id` - Links logs across services
- `job_type` - Specifies the action \(e\.g\., `record.updated`\)
- `error_message` - Records failure details
- `status` - Tracks current workflow state
- `retry_count` - Counts retry attempts
- `retry_at` - Schedules next retry
- `started_at`, `failed_at`, `completed_at` - Provides metrics for observability
- `integration_id` - Links to specific integrations
- `integration_type` - Identifies the platform
- `payload` - Contains job data
- `deduplication_id` - Prevents duplicate execution
### Creating indexes
Postgres-backed queues require careful indexing\. We use partial indexes \(with WHERE clauses\) only for actively queried states: `processing`, `to-be-retried`, `retrying`, and `pending`\.
We don’t index `failed` or `completed` states\. These statuses contain the majority of jobs in the table and aren’t needed in the job processing flow\. Indexing them would just add more data into the memory when we don’t use it in the flow\. Jobs are ordered by `created_at` for FIFO processing, with priority queue overrides when applicable\.
### Job Lifecycle
Jobs follow a defined lifecycle:
1. **Created** → Initial state: `pending`
1. **Picked up** → Transitions to `processing`
1. **Success** → Becomes `completed`, records `completed_at`
1. **Failed \(10 retries\)** → Becomes `failed`, records `failed_at`
1. **Failed \(retries remaining\)** → Becomes `to-be-retried`, increments `retry_count`, calculates `retry_at`
Timestamp fields serve observability purposes, measuring job duration and identifying bottlenecks\.
For `to-be-retried` jobs, retry timing is calculated using exponential backoff\.
### Worker System Architecture
The worker system requirements:
- Parallel worker execution
- Horizontal scaling across containers
- Graceful shutdowns without job loss
- Distributed rate limit enforcement—we need to respect rate limits no matter how many containers we run
#### Configuration
| | |
| - | - |
| ```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
``` | ```
const (
DefaultWorkerCount = 10
DefaultJobFetchLimit = 20
DefaultDispatcherInterval = 1 * time.Second
DefaultMaxRetries = 10
)
var RetryBackoffSchedule = []time.Duration{
0, // Retry 1: instant
10 * time.Second, // Retry 2: 10 seconds
30 * time.Second, // Retry 3: 30 seconds
1 * time.Minute, // Retry 4: 1 minute
2 * time.Minute, // Retry 5: 2 minutes
5 * time.Minute, // Retry 6: 5 minutes
10 * time.Minute, // Retry 7: 10 minutes
15 * time.Minute, // Retry 8: 15 minutes
20 * time.Minute, // Retry 9: 20 minutes
30 * time.Minute, // Retry 10: 30 minutes
}
type WorkerConfig struct {
WorkerCount int
JobFetchLimit int
DispatcherInterval time.Duration
}
func DefaultWorkerConfig() WorkerConfig {
return WorkerConfig{
WorkerCount: DefaultWorkerCount,
JobFetchLimit: DefaultJobFetchLimit,
DispatcherInterval: DefaultDispatcherInterval,
}
}
func GetRetryBackoff(retryCount int64) time.Time {
if config.Environment == "test" {
return time.Now()
}
var backoffDuration time.Duration
if int(retryCount) < len(RetryBackoffSchedule) {
backoffDuration = RetryBackoffSchedule[retryCount]
} else {
backoffDuration = 30 * time.Minute
}
return time.Now().Add(backoffDuration)
}
``` |
#### Worker Implementation
| | |
| - | - |
| ```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
``` | ```
type Worker struct {
db *pgxpool.Pool
workerCount int
jobFetchLimit int
fetchInterval time.Duration
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
shutdown chan struct{}
}
func NewWorker(db *pgxpool.Pool, config WorkerConfig) *Worker {
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
return &Worker{
db: db,
workerCount: config.WorkerCount,
jobFetchLimit: config.JobFetchLimit,
fetchInterval: config.DispatcherInterval,
ctx: ctx,
cancel: cancel,
shutdown: make(chan struct{}),
wg: &wg,
}
}
func (w *Worker) Start() {
herolog.Info(w.ctx).Int("workers", w.workerCount).Msg("Starting worker queue system")
for i := 0; i < w.workerCount; i++ {
w.wg.Add(1)
go w.worker(i)
}
herolog.Info(w.ctx).Msg("Worker queue system started")
}
func (w *Worker) Stop() {
herolog.Info(w.ctx).Msg("Stopping worker queue system")
// Tell the goroutines to stop fetching more jobs and
// put the rest of the jobs in to-be-retried state
w.cancel()
close(w.shutdown)
// wait for each goroutines to stop
w.wg.Wait()
herolog.Info(w.ctx).Msg("Worker queue system stopped")
}
func (w *Worker) worker(workerID int) {
defer w.wg.Done()
workerName := fmt.Sprintf("worker-%d", workerID)
herolog.Info(w.ctx).Str("worker", workerName).Msg("Worker started")
ticker := time.NewTicker(w.fetchInterval)
defer ticker.Stop()
for {
select {
case <-w.ctx.Done():
herolog.Info(w.ctx).Str("worker", workerName).Msg("Worker stopping")
return
case <-w.shutdown:
herolog.Info(w.ctx).Str("worker", workerName).Msg("Worker shutdown signal received")
return
case <-ticker.C:
jobs, err := database.FetchJobs(w.ctx, w.db, w.jobFetchLimit)
if err != nil {
herolog.Err(w.ctx, err).Str("worker", workerName).Msg("Failed to fetch jobs")
continue
}
if len(jobs) == 0 {
herolog.Debug(w.ctx).Msg("got 0 jobs")
continue
}
herolog.Debug(w.ctx).Str("worker", workerName).Int("count", len(jobs)).Msg("Fetched jobs")
for _, job := range jobs {
select {
case <-w.ctx.Done():
herolog.Info(w.ctx).Str("worker", workerName).Msg("Worker stopping, retrying current job")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := database.RetryJob(shutdownCtx, w.db, job.ID, time.Now()); err != nil {
herolog.Err(w.ctx, err).Str("job_id", job.ID.String()).Msg("Failed to retry job during shutdown")
}
cancel()
return
case <-w.shutdown:
herolog.Info(w.ctx).Str("worker", workerName).Msg("Worker shutdown, retrying current job")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := database.RetryJob(shutdownCtx, w.db, job.ID, time.Now()); err != nil {
herolog.Err(w.ctx, err).Str("job_id", job.ID.String()).Msg("Failed to retry job during shutdown")
}
cancel()
return
default:
w.ProcessJob(workerName, job)
}
}
}
}
}
``` |
#### Worker Operation
We evaluated two approaches: maintaining in-memory queues with multiple goroutines using for and select to fetch jobs, or having goroutines fetch data from the database and iterate over the results\.
We chose the database iteration approach for its simplicity\. [pgxpool](https://pkg.go.dev/github.com/jackc/pgx/v4/pgxpool) handles connection pooling, eliminating the need for channel-based in-memory queues\.
Each worker runs in a separate goroutine, using a [ticker](https://gobyexample.com/tickers) to poll for jobs every second\. Before processing, workers check for shutdown signals \(`ctx.Done()` or `shutdown` channel\)\. When shutdown is initiated, workers stop accepting new jobs and mark in-flight jobs as `to-be-retried`\.
This prevents stalled jobs from blocking integration queues\. Checking shutdown signals between jobs ensures clean shutdowns\.
During shutdown, we create a fresh context with for retrying jobs\. This prevents database write failures when the main context is canceled\.
#### Fair Scheduling Query
The `FetchJobs` query implements fair scheduling to prevent high-volume integrations from monopolizing workers:
| | |
| - | - |
| ```
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
``` | ```
WITH integration_job_counts AS (
SELECT integration_id, COUNT(*) as processing_count
FROM jobs
WHERE status = 'processing'
GROUP BY integration_id
HAVING COUNT(*) >= 50
)
UPDATE jobs
SET status = 'processing', started_at = CURRENT_TIMESTAMP
WHERE id IN (
SELECT j.id
FROM jobs j
WHERE j.status IN ('pending', 'retrying')
AND j.integration_id NOT IN (SELECT integration_id FROM integration_job_counts)
ORDER BY
CASE
WHEN j.job_type = ANY(ARRAY['attribute.update', 'attribute.delta_update', 'metadata.update', 'metadata.single_update']) THEN 0
ELSE 1
END,
j.created_at ASC
LIMIT @limit
FOR UPDATE SKIP LOCKED
)
RETURNING id, app_id, correlation_id, integration_id, integration_type, job_type, payload, status, error_message, retry_count, started_at, failed_at, retry_at, completed_at, created_at, deduplication_id`
``` |
**Query breakdown:**
**Step 1: Identify busy integrations**
| | |
| - | - |
| ```
1
2
3
4
5
6
7
``` | ```
WITH integration_job_counts AS (
SELECT integration_id, COUNT(*) as processing_count
FROM jobs
WHERE status = 'processing'
GROUP BY integration_id
HAVING COUNT(*) >= 50
)
``` |
This CTE identifies integrations with 50\+ concurrent processing jobs\.
**Step 2: Select jobs with priority ordering**
| | |
| - | - |
| ```
1
2
3
4
5
6
7
8
9
10
11
12
``` | ```
SELECT j.id
FROM jobs j
WHERE j.status IN ('pending', 'retrying')
AND j.integration_id NOT IN (SELECT integration_id FROM integration_job_counts)
ORDER BY
CASE
WHEN j.job_type = ANY(ARRAY['attribute.update', 'attribute.delta_update', 'metadata.update', 'metadata.single_update']) THEN 0
ELSE 1
END,
j.created_at ASC
LIMIT @limit
FOR UPDATE SKIP LOCKED
``` |
Jobs are selected from integrations not in the busy list\. Priority updates are ordered first, followed by FIFO ordering\. `FOR UPDATE SKIP LOCKED` locks selected rows to the current transaction, preventing duplicate processing by concurrent workers\.
**Step 3: Update job status**
| | |
| - | - |
| ```
1
2
3
4
5
6
``` | ```
UPDATE jobs
SET status = 'processing', started_at = CURRENT_TIMESTAMP
WHERE id IN (
...
)
RETURNING id, app_id, correlation_id, integration_id, integration_type, job_type, payload, status, error_message, retry_count, started_at, failed_at, retry_at, completed_at, created_at, deduplication_id
``` |
Selected jobs are updated to `processing` status with a recorded start time\.
This ensures fair resource allocation across integrations\.
#### Timeout Implementation
Job timeouts are critical for queue health\. In the initial release, we reused the global context for job processing\. When jobs hung waiting for slow external APIs, they couldn’t be marked completed or failed due to context lifecycle coupling\.
Jobs accumulated in `processing` state indefinitely\.
The solution: context separation\. The global context controls worker lifecycle\. Each job receives its own context with a timeout\. Timed-out jobs are marked `to-be-retried`, allowing queue progression\.
This also enables database writes during shutdown using a fresh context, even when the global context is canceled\.
#### Retry Logic
Failed jobs require retry logic with appropriate timing\. Immediate retries against failing external APIs are counterproductive\. We implement exponential backoff: instant first retry, 10 seconds for the second, 30 seconds for the third, up to 30 minutes\.
The `retry_count` field drives backoff calculation\. After 10 attempts, jobs are marked `failed`\.
Error types guide retry behavior:
- `NonRetryableError` - Permanent failures \(e\.g\., validation errors\)\. No retry\.
- `RetryableError` - Transient failures \(e\.g\., 500 Internal Server Error\)\. Retry with backoff\.
- `MaxRetryExceededError` - Retry limit reached\. Mark failed\.
This allows each integration to decide how to handle errors based on the external API’s response\. For example, a 400 Bad Request might be a permanent validation failure \(NonRetryableError\), while a 503 Service Unavailable is transient and should retry \(RetryableError\)\. The integration implementation determines the appropriate error type for each scenario\.
#### Stalled Job Recovery
Jobs occasionally become stuck in `processing` state due to worker panics, database connection failures, or unexpected container termination\.
A cron job runs every minute, identifying jobs in `processing` state beyond the expected duration\. These jobs are moved to `to-be-retried` with incremented retry counts, treating them as standard failures\.
This ensures queue progression despite unexpected failures\.
#### Distributed Rate Limiting
Rate limiting across multiple containers was v2’s most complex challenge\.
V1’s in-memory rate limiter worked for single containers but couldn’t share state across instances\. While Redis was an option, we already had PostgreSQL with sufficient performance\.
The solution: a `rate_limits` table tracking request counts per integration per second:
| | |
| - | - |
| ```
1
2
3
4
5
6
7
``` | ```
CREATE TABLE public.rate_limits (
bucket_key character varying(255) NOT NULL,
request_count integer DEFAULT 0 NOT NULL,
window_start timestamp without time zone NOT NULL,
created_at timestamp without time zone DEFAULT now(),
updated_at timestamp without time zone DEFAULT now()
);
``` |
Before external API requests, we increment the counter for the integration’s current time window \(rounded to the second\)\. PostgreSQL returns the new count\.
If the count exceeds the limit, we sleep 250ms and retry\. If under the limit, we proceed\.
This works because all containers share the database as the source of truth for rate limiting\.
Occasionally, jobs are rate-limited during heavy load due to the gap between count checking and request sending\. These jobs retry immediately\. The occurrence rate is acceptable\.
## The End
Hope you enjoyed this article and learned something new\.
This system has worked really well so far, and we’ve had only a few minor issues that we fixed quickly\.
I will update this article over time\.