This post continues the event handling series. Check out the previous post on Inbox Pattern to learn how we handle event ordering and idempotency.
The Problem with Historical Events
When you create a new service that subscribes to events from other domains, you face a common challenge: your service needs historical data to function independently.
Imagine you're building an Order Service that needs to display user information. You subscribe to user events like UserCreated
, PersonalDataUpdated
, and StatusUpdated
. But what about users who were created before your service e...
This post continues the event handling series. Check out the previous post on Inbox Pattern to learn how we handle event ordering and idempotency.
The Problem with Historical Events
When you create a new service that subscribes to events from other domains, you face a common challenge: your service needs historical data to function independently.
Imagine you're building an Order Service that needs to display user information. You subscribe to user events like UserCreated
, PersonalDataUpdated
, and StatusUpdated
. But what about users who were created before your service existed? Your database is empty, and you can't show any order details properly.
This applies whether your services are deployed separately or as part of a modular monolith. The key is that each service owns its data and communicates through events.
Some Approaches to Manage That
Teams typically handle this in a few ways:
Manual data migration - Write custom scripts to copy data from the source service's database. This breaks service boundaries and creates tight coupling.
Batch import endpoints - Ask the source team to build special APIs for bulk data export. This requires coordination and extra development work.
Accept the gap - Start fresh and only handle new events. This means your service is incomplete until enough time passes.
All of these approaches are painful and error-prone.
A Better Way: Automatic Event Bootstrap
The solution is surprisingly elegant: fetch historical events the same way you'll receive future events.
Here's the key insight: if every service provides a simple method to fetch events by topic and timestamp, any dependent service can automatically bootstrap itself during startup.
How It Works
The VT framework provides all the infrastructure for event bootstrapping. On the service side, you only need two simple steps:
Step 1: Create a Storage Adapter
Create a simple storage adapter for tracking bootstrap progress:
@Component
class OrderEventBootstrapStorageAdapter(
repository: OrderEventBootstrapRepository,
) : VTMongoEventBootstrapStorageAdapter(repository)
The adapter extends the framework's base class and handles saving/loading bootstrap records. You can use any database - MongoDB, PostgreSQL, etc.
Step 2: Add the EventBootstrap Annotation
Add the annotation to your consumer class:
@EventConsumer
@EventBootstrap(
consumerName = "UserView", // define a static unique name
storageBean = OrderEventBootstrapStorageAdapter::class, // use defined adapter
clientBean = UserEventClientV1::class // each service provides an event client
)
class UserEventsConsumerV1(
private val orchestrator: UserViewOrchestrator,
) {
fun onCreated(event: EventV1<UserCreatedV1>) {
orchestrator.create(/* ... */)
}
<span class="k">fun</span> <span class="nf">onPersonalDataUpdated</span><span class="p">(</span><span class="n">event</span><span class="p">:</span> <span class="nc">EventV1</span><span class="p"><</span><span class="nc">PersonalDataUpdatedV1</span><span class="p">>)</span> <span class="p">{</span>
<span class="n">orchestrator</span><span class="p">.</span><span class="nf">update</span><span class="p">(</span><span class="cm">/* ... */</span><span class="p">)</span>
<span class="p">}</span>
<span class="k">fun</span> <span class="nf">onStatusUpdated</span><span class="p">(</span><span class="n">event</span><span class="p">:</span> <span class="nc">EventV1</span><span class="p"><</span><span class="nc">UserStatusUpdatedV1</span><span class="p">>)</span> <span class="p">{</span>
<span class="n">orchestrator</span><span class="p">.</span><span class="nf">update</span><span class="p">(</span><span class="cm">/* ... */</span><span class="p">)</span>
<span class="p">}</span>
}
That's it! No special bootstrap code needed. The framework handles everything else.
What the Framework Does
The VT framework provides the complete bootstrapping infrastructure.
Event Fetching Interface
The framework defines a standard interface that each domain service implements:
interface VTEventClientV1 {
suspend fun fetch(request: EventsFetchRequestV1): List<BrokerMessageV1>
}
data class EventsFetchRequestV1(
val body: EventsFetchRequestBodyV1,
val timeout: Duration? = null,
)
data class EventsFetchRequestBodyV1(
val topics: Set<String>,
val after: Instant,
val itemsCount: Int
)
This can be implemented using internal clients (for modular monoliths or testing), REST APIs, WebSockets, or any other protocol. The important part is the consistent interface.
Automatic Topic Detection
The framework automatically discovers which topics your consumer needs:
private fun registerConsumer(beanName: String, consumer: Any) {
val registeredTopics: MutableSet<String> = mutableSetOf()
<span class="k">for</span> <span class="p">(</span><span class="n">method</span> <span class="k">in</span> <span class="n">consumer</span><span class="o">::</span><span class="k">class</span><span class="p">.</span><span class="n">java</span><span class="p">.</span><span class="n">methods</span><span class="p">)</span> <span class="p">{</span>
<span class="c1">// Detect event types from method parameters</span>
<span class="kd">val</span> <span class="py">eventBodyType</span> <span class="p">=</span> <span class="nf">extractEventType</span><span class="p">(</span><span class="n">method</span><span class="p">)</span>
<span class="n">registeredTopics</span><span class="p">.</span><span class="nf">add</span><span class="p">(</span><span class="n">topicMap</span><span class="p">[</span><span class="n">eventBodyType</span><span class="p">]</span><span class="o">?.</span><span class="nf">topic</span><span class="p">())</span>
<span class="n">consumerRegistry</span><span class="p">.</span><span class="nf">register</span><span class="p">(</span>
<span class="n">eventType</span> <span class="p">=</span> <span class="n">eventBodyType</span><span class="p">,</span>
<span class="n">handler</span> <span class="p">=</span> <span class="p">{</span> <span class="n">event</span> <span class="p">-></span> <span class="n">method</span><span class="p">.</span><span class="nf">invoke</span><span class="p">(</span><span class="n">consumer</span><span class="p">,</span> <span class="n">event</span><span class="p">)</span> <span class="p">}</span>
<span class="p">)</span>
<span class="p">}</span>
<span class="n">bootstrapRegistry</span><span class="p">.</span><span class="nf">setup</span><span class="p">(</span><span class="n">consumer</span><span class="p">,</span> <span class="n">registeredTopics</span><span class="p">)</span>
}
Progressive Fetching with Retry Logic
The bootstrap executor fetches events in batches with exponential backoff:
private suspend fun executeOne(record: EventBootstrapRecord) {
var processedTotal = 0
var hasMore = true
var currentRecord = record
<span class="k">while</span> <span class="p">(</span><span class="n">hasMore</span><span class="p">)</span> <span class="p">{</span>
<span class="k">when</span> <span class="p">(</span><span class="kd">val</span> <span class="py">result</span> <span class="p">=</span> <span class="nf">fetchAndProcessBatch</span><span class="p">(</span><span class="n">currentRecord</span><span class="p">))</span> <span class="p">{</span>
<span class="k">is</span> <span class="nc">BatchResult</span><span class="p">.</span><span class="nc">Success</span> <span class="p">-></span> <span class="p">{</span>
<span class="k">if</span> <span class="p">(</span><span class="n">result</span><span class="p">.</span><span class="n">messages</span><span class="p">.</span><span class="nf">isEmpty</span><span class="p">())</span> <span class="p">{</span>
<span class="n">hasMore</span> <span class="p">=</span> <span class="k">false</span>
<span class="nf">markAsDone</span><span class="p">(</span><span class="n">currentRecord</span><span class="p">)</span>
<span class="p">}</span> <span class="k">else</span> <span class="p">{</span>
<span class="n">processedTotal</span> <span class="p">+=</span> <span class="n">result</span><span class="p">.</span><span class="n">messages</span><span class="p">.</span><span class="n">size</span>
<span class="n">currentRecord</span> <span class="p">=</span> <span class="nf">updateTimestamp</span><span class="p">(</span><span class="n">currentRecord</span><span class="p">,</span> <span class="n">result</span><span class="p">.</span><span class="n">messages</span><span class="p">)</span>
<span class="n">log</span><span class="p">.</span><span class="nf">info</span> <span class="p">{</span>
<span class="s">"Processed batch of ${result.messages.size} events. "</span> <span class="p">+</span>
<span class="s">"Total: $processedTotal"</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="k">is</span> <span class="nc">BatchResult</span><span class="p">.</span><span class="nc">Failure</span> <span class="p">-></span> <span class="p">{</span>
<span class="nf">markAsFailed</span><span class="p">(</span><span class="n">currentRecord</span><span class="p">,</span> <span class="n">result</span><span class="p">.</span><span class="n">error</span><span class="p">)</span>
<span class="n">hasMore</span> <span class="p">=</span> <span class="k">false</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="p">}</span>
}
Progress Tracking
The system tracks progress in a bootstrap collection:
{
"_id": "3d88ca1e-c166-4870-87fa-54ddd4b8cd77",
"consumerName": "UserView",
"status": "DONE",
"topics": [
"user.model.created.v1",
"user.status.updated.v1",
"user.personal-data.updated.v1"
],
"lastCreatedAt": "2025-10-05T12:39:22.027Z",
"processedCount": 45
}
This means you can safely stop and restart your service during bootstrap. It will resume from where it left off.
Same Handlers for Historical and Real-Time Events
Here's the beautiful part: historical events use the exact same consumer methods as real-time events.
private fun callConsumers(message: BrokerMessageV1, record: EventBootstrapRecord) {
consumerRegistry
.getConsumers(message.body::class)
.filter { it.bootstrapConsumerName == record.consumerName }
.forEach { it.handler(message) }
}
Your consumers already handle event ordering and idempotency properly (see the previous post for details), so receiving historical and real-time events simultaneously isn't a problem.
Adding New Features Later
Let's say your Order Service initially subscribed to:
product.created.v1
product.price.updated.v1
Later, you need to add product status information to your order table. Simply add a new handler method:
@EventConsumer
@EventBootstrap(
consumerName = "ProductView",
storageBean = OrderEventBootstrapStorageAdapter::class,
clientBean = ProductEventClientV1::class
)
class ProductEventsConsumerV1 {
// Existing handlers
fun onCreated(event: EventV1<ProductCreatedV1>) { /* ... */ }
fun onPriceUpdated(event: EventV1<PriceUpdatedV1>) { /* ... */ }
<span class="c1">// New handler - framework detects the new topic automatically</span>
<span class="k">fun</span> <span class="nf">onStatusUpdated</span><span class="p">(</span><span class="n">event</span><span class="p">:</span> <span class="nc">EventV1</span><span class="p"><</span><span class="nc">ProductStatusUpdatedV1</span><span class="p">>)</span> <span class="p">{</span>
<span class="c1">// Your new logic here</span>
<span class="p">}</span>
}
On next restart, the framework detects the new topic and fetches only those historical events:
private fun createRecord(bootstrap: EventBootstrap, topics: Set<String>) {
val existingRecords = storage.get(bootstrap.consumerName)
val allExistingTopics = existingRecords.flatMap { it.topics }.toSet()
val newTopics = topics - allExistingTopics
<span class="k">if</span> <span class="p">(</span><span class="n">newTopics</span><span class="p">.</span><span class="nf">isEmpty</span><span class="p">())</span> <span class="k">return</span>
<span class="kd">val</span> <span class="py">newRecord</span> <span class="p">=</span> <span class="nc">EventBootstrapRecord</span><span class="p">(</span>
<span class="n">consumerName</span> <span class="p">=</span> <span class="n">bootstrap</span><span class="p">.</span><span class="n">consumerName</span><span class="p">,</span>
<span class="n">topics</span> <span class="p">=</span> <span class="n">newTopics</span><span class="p">,</span>
<span class="n">lastCreatedAt</span> <span class="p">=</span> <span class="nc">Instant</span><span class="p">.</span><span class="nc">EPOCH</span><span class="p">,</span>
<span class="n">status</span> <span class="p">=</span> <span class="nc">EventBootstrapStatus</span><span class="p">.</span><span class="nc">IN_PROGRESS</span>
<span class="p">)</span>
<span class="n">storage</span><span class="p">.</span><span class="nf">save</span><span class="p">(</span><span class="n">newRecord</span><span class="p">)</span>
}
Perfect for Continuous Integration
This approach works excellently with continuous deployment:
- First iteration: Create your service with initial event subscriptions
- Deploy: The service starts, bootstraps historical events, and begins receiving real-time events
- Later iteration: Add new event handlers for new features
- Deploy again: The service automatically fetches only the new historical events
No manual migrations. No coordination meetings. No data import scripts. Just deploy and let the framework handle it.
Bootstrap States
The system tracks four states:
enum class EventBootstrapStatus {
PENDING, // Ready to start
IN_PROGRESS, // Currently fetching
DONE, // Successfully completed
FAILED // Error occurred
}
You can also control automatic startup:
@EventBootstrap(
consumerName = "UserView",
storageBean = OrderEventBootstrapStorageAdapter::class,
clientBean = UserEventClientV1::class,
autoStart = false // Start manually via endpoint
)
Summary
Event bootstrapping solves the historical data problem elegantly by:
- Reusing the same event handlers for historical and real-time events
- Automatically detecting which events your consumers need
- Tracking progress so restarts don't lose work
- Handling failures with retry logic and backoff
- Supporting incremental changes when adding new features
- Working seamlessly in both modular monoliths (internal clients) and distributed architectures (remote clients)
The result? Your services can truly be independent, with complete data from day one, and the ability to evolve without painful migrations.
Source Code
Full implementation is available in the VibeTDD GitLab repository.