ts-event-core
This project is an implementation of Event Sourcing, written in TypeScript using functional programming conventions. It contains a set of loosely coupled components which can be interchanged and composed together.
ts-event-core
This project is an implementation of Event Sourcing, written in TypeScript using functional programming conventions. It contains a set of loosely coupled components which can be interchanged and composed together.
Example domain
Our example domain comes from the airline industry. We’ve been tasked with figuring out how to notify passengers when flights are delayed. We’ve been given a few basic requirements:
- We should be able to schedule flights.
- Passengers can purchase tickets to those flights and set notification preferences on their account.
- When flights are delayed, all ticket holders are sent either an SMS or email, depending on their preferences.
We will start by defining our domain, then explore how it’s consumed by components of the framework.
Aggregate roots
A domain starts with a declaration of the aggregate roots. Each aggregate root has an identifier, in this case FLIGHT and PASSENGER. This object represents a bundle of all the code contained within the domain and is later consumed by components of the framework. 🔗
export const airlineAggregateRoots = {
FLIGHT: flightAggregateRoot,
PASSENGER: passengerAggregateRoot,
};
An aggregate root definition contains a map of commands and state, describing the business rules. 🔗
export const flightAggregateRoot = {
state: {
version: 1,
initialState: { status: "NOT_YET_SCHEDULED" },
reducer: flightReducer,
},
commands: {
scheduleFlight,
purchaseTicket,
delayFlight,
},
} satisfies AggregateRootDefinition<FlightState, FlightEvent>;
State
The main component of state, a reducer, is responsible for creating a useful decision model out of the events raised. A reducer will only ever process a single stream of events from a specific aggregate root (or a specific flight in our case).
In this case we’re keeping track of the total number of seats we’re allowed to sell as tickets are purchased, so that we don’t accidentally overbook a flight. 🔗
export function flightReducer(state: FlightState, event: FlightEvent): FlightState {
switch (event.type) {
case "FLIGHT_SCHEDULED": {
return {
status: "SCHEDULED",
totalSeats: event.sellableSeats,
totalAvailableSeats: event.sellableSeats,
totalSeatsSold: 0,
passengerManifest: [],
};
}
case "TICKET_PURCHASED": {
assertFlightScheduled(state);
return {
...state,
totalSeatsSold: state.totalSeatsSold + 1,
totalAvailableSeats: state.totalAvailableSeats - 1,
passengerManifest: [...state.passengerManifest, event.passengerId],
};
}
}
return state;
}
Commands
Commands are pure functions. They receive a state object and command data from the issuer as arguments. They return an event (or array of events), which occurred as a result of processing the command.
Since these are pure functions, any functional programming techniques can be applied here. A lot of the commands in this domain require that a flight has already been scheduled, so the withScheduledFlight HOF takes care of this check for us, returning a TICKET_PURCHASED_FAILED event on our behalf and narrowing the FlightState argument into ScheduledFlightState. 🔗
export const purchaseTicket = withScheduledFlight("TICKET_PURCHASED_FAILED", (
flight: ScheduledFlightState,
{ passengerId, purchasePriceAudCents }: {
passengerId: string;
purchasePriceAudCents: number;
},
): FlightEvent => {
if (flight.totalAvailableSeats === 0) {
return {
type: "TICKET_PURCHASED_FAILED",
reason: "NO_AVAILABLE_SEATS",
};
}
return {
type: "TICKET_PURCHASED",
passengerId: passengerId,
purchasePrice: {
currency: "AUD",
cents: purchasePriceAudCents,
},
};
});
Process manager
A process manager facilitates coordination between aggregates. In this case, when a FLIGHT_DELAYED event is raised, we must issue a command for each of the impacted passengers, to notify them of the delay.
Command processing within a single aggregate is considered strongly consistent, data passed into each command is guaranteed to be up-to-date and the outcome of a command will never be committed if it conflicts with any commands running in parallel for the same aggregate.
The same does not apply between aggregates. If a flight is delayed the moment a ticket is purchased, the passenger may or may not be notified of the delay. Passengers are notified of delays at some point in the future, after a delay has been recorded. These tradeoffs are acceptable in our sample domain.
Features which require strong consistency (such as not overbooking a flight) must be validated within a single aggregate root. If consistency issues are a headache in a given domain, it may be a sign that aggregate roots need to be less granular. The granularity of aggregates generally trade-off parallelism and throughput with consistency. In this case, passengers updating their notification preferences can happen in parallel to flight scheduling and ticket purchasing. 🔗
export async function flightDelayProcessManager(
{ event, issueCommand }: {
event: AirlineDomainEvent;
issueCommand: CommandIssuer<typeof airlineAggregateRoots>;
},
) {
if (event.payload.type === "FLIGHT_DELAYED") {
const delayedUntil = event.payload.delayedUntil;
await Promise.all(event.payload.impactedPassengerIds.map(async (impactedPassenger) => {
await issueCommand({
aggregateRootType: "PASSENGER",
command: "notifyOfFlightDelay",
aggregateRootId: impactedPassenger,
data: {
flightNumber: event.aggregateRootId,
delayedUntil,
},
});
}));
}
}
Projections
With our domain producing events, we can extract valuable insights from them. Projections (or read models) are data structures built from our event store that can answer questions about our data or satisfy certain access patterns.
In this case, we’ve been tasked with producing a lifetime earnings report, summing the purchase price of all tickets ever sold. Like our decision model, this is also structured as a reducer, but there is no hard requirement for where this data is stored or how the data structure is built. 🔗
export const lifetimeEarningsReport: LifetimeEarningsReportProjection = {
initialState: {
lifetimeEarningsCents: 0,
},
reducer: (state: LifetimeEarningsReport, event: AirlineDomainEvent) => ({
lifetimeEarningsCents: state.lifetimeEarningsCents +
(event.payload.type === "TICKET_PURCHASED" ? event.payload.purchasePrice.cents : 0),
}),
};
Bootstrap
To bootstrap a domain into a working application, key framework components need to be composed together with the domain. All components have in-memory implementations, which allow for fast integration testing and persistent components where applicable, which are more suited to production.
The framework does not dictate the shape or properties of a bootstrap, but instead provides a library of underlying components which should be composed together depending on the use case.
The event-sourcing-bootstrap.test.ts test is reference for initializing an in-memory bootstrap and a production bootstrap, both which pass the same integration test case. The in-memory bootstrap runs ~10x faster allowing for fast integration testing feedback with relatively high fidelity to a production bootstrap.
The test case beings begins with scheduling a flight:
it("allows us to schedule a flight", async () => {
await issueCommand({
aggregateRootType: "FLIGHT",
aggregateRootId: "SB93",
command: "scheduleFlight",
data: {
departureTime: new Date("2035-01-01T05:00:00Z"),
sellableSeats: 3,
},
});
});
...and concludes asserting notifications for a delay were sent to the correct passengers, through the correct channels:
it("ensures notifications are sent to affected passengers through the correct channel", async () => {
await tryThing(() =>
assertArrayIncludes(notificationLog, [
"EMAIL: sam@example.com Flight delayed Hi, Flight SB93 has been delayed. Sorry about that.",
"SMS: +61491570158 Uh-oh! Flight SB93 has been delayed... we're sorry :(",
])
);
});
Key framework components
CommandIssuer
A CommandIssuer is responsible for receiving commands, preparing the required state, executing commands and then persisting the outcome.
Implementations
AggregateRootRepository
An AggregateRootRepository is responsible for loading aggregate state and persisting any pending events which were recorded as the result of processing a command.
Implementations
SnapshotStorage
Some aggregate roots contain a large number of events. Components like createSnapshottingAggregateRootRepository can persist a snapshot of aggregate state to avoid needing to load large streams of events. Snapshots can be stored in memory or be persistent.
The aggregateRootDefinition.state.version is a mechanism for versioning state, as the underlying reducer evolves.
Implementations
EventStore
The EventStore retrieves and persists events.
Implementations
Projector
Projectors take a stream of events from an event store and transform them into useful data structures. These are often called read models. Read models are considered eventually consistent and can be created or deleted as required.
These data structures can be stored in memory, relational databases, speciality databases or any other system.
For these reasons, the signature of a projection is extremely simple, the only contract that needs to be fulfilled is providing a stream of events. How data is reduced, retrieved or accessed beyond, is dependent on the use case.